当函数闭包被传递到远端集群节点上时,它是一个副本。它使用的所有变量在每个节点上都复制了一份,这些变量的修改都不会被传回Driver程序。为了跨Task读写共享变量,所以Spark引入了广播变量、累加器。
广播变量
广播变量让每个节点持有一个只读变量的值,而不是让每个Task都有一份,节省空间。跨多个stage使用相同的数据时,广播变量很有用。
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)
scala> broadcastVar.value
res33: Array[Int] = Array(1, 2, 3)
累加器
累加器支持在并行执行的Task中进行add操作,以获取相关的聚合值,一般用于计数或者求和。
// 创建一个累加器
scala> val accum = sc.longAccumulator("My Accumulator")
// 更新累加器
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
// 访问累加器的值
scala> accum.value
res2: Long = 10
除了内建的long型累加器,Spark允许用户自己定义累加器。继承自 AccumulatorV2 重写即可
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// 创建累加器
val myVectorAcc = new VectorAccumulatorV2
// 注册到 spark context
sc.register(myVectorAcc, "MyVectorAcc1")
Note:Executor中的Task不能访问累加器的值,对于他们来说只写。在行动算子中更新累加器,Spark 保证每一个Task的更新操作只被执行一次,就算重启Task不会修改累加器的值。而在转换算子中更新累加器,发生task或stage重新执行时,会让累加器的值被更新不止一次。
版权声明:本文为zhangtikang134原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。