spark 获取广播变量_spark使用广播变量

import java.io.{File, FileReader}

import java.util

import org.apache.spark.SparkConf

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer

object SparkTest{

// 使用广播变量过滤 敏感数据

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local[2]").setAppName("test")

val spark = SparkSession.builder().config(conf).getOrCreate()

spark.sparkContext.setLogLevel("WARN")

//计算逻辑

compute(spark)

spark.stop()

}

def compute(spark:SparkSession):Unit ={

import spark.implicits._

//加载过敏词汇并存储到 ArrayList 中

val filterDataPath = "G:\\tmp\\b.txt"

val al = new util.ArrayList[String]()

val reader = new java.io.BufferedReader(new FileReader(new File(filterDataPath)))

while (reader.ready()){

val str = reader.readLine()

str.split(" ").foreach(al.add(_))

}

val broadcast: Broadcast[util.ArrayList[String]] = spark.sparkContext.broadcast(al)

spark.sparkContext.textFile("G:\\tmp\\a.txt")

.mapPartitions(ite =>{

// 对源数据进行切割分词,并对每个词进行校验,符合要求的词汇添加到ArrayBuffer中

val arr = ArrayBuffer[String]()

val filterWord:util.ArrayList[String] = broadcast.value

ite.foreach(line => {

line.split(" ").foreach(word => {

if( !filterWord.contains(word) ) arr.+=(word)

})

})

arr.toIterator

})

.collect

.foreach(println)

}

}


版权声明:本文为weixin_36344862原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。