共享变量

当函数闭包被传递到远端集群节点上时,它是一个副本。它使用的所有变量在每个节点上都复制了一份,这些变量的修改都不会被传回Driver程序。为了跨Task读写共享变量,所以Spark引入了广播变量、累加器。

广播变量

广播变量让每个节点持有一个只读变量的值,而不是让每个Task都有一份,节省空间。跨多个stage使用相同的数据时,广播变量很有用。

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)

scala> broadcastVar.value
res33: Array[Int] = Array(1, 2, 3)

累加器

累加器支持在并行执行的Task中进行add操作,以获取相关的聚合值,一般用于计数或者求和。

// 创建一个累加器
scala> val accum = sc.longAccumulator("My Accumulator")
// 更新累加器
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
// 访问累加器的值
scala> accum.value
res2: Long = 10

除了内建的long型累加器,Spark允许用户自己定义累加器。继承自 AccumulatorV2 重写即可

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// 创建累加器
val myVectorAcc = new VectorAccumulatorV2
// 注册到 spark context
sc.register(myVectorAcc, "MyVectorAcc1")

Note:Executor中的Task不能访问累加器的值,对于他们来说只写。在行动算子中更新累加器,Spark 保证每一个Task的更新操作只被执行一次,就算重启Task不会修改累加器的值。而在转换算子中更新累加器,发生task或stage重新执行时,会让累加器的值被更新不止一次。


版权声明:本文为zhangtikang134原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。