Flink广播的使用

官网参考地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#broadcast-variables


广播的定义:

Broadcast variables allow you to make a data set available to all parallel instances of an operation, in addition to the regular(常规) input of the operation. This is useful for auxiliary(辅助的) data sets, or data-dependent parameterization. The data set will then be accessible at the operator as a Collection.


  • Broadcast: broadcast sets are registered by name via withBroadcastSet(DataSet, String)
     可以使用withBroadcastSet方法,通过名字注册一个广播
  • Access: accessible via getRuntimeContext().getBroadcastVariable(String) at the target operator.
  使用getRuntimeContext().getBroadcastVariable(String) 方法根据名字获取广播变量


直接上代码:

 

package com.daxin

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.extensions._
import org.apache.flink.configuration.Configuration
import scala.collection.JavaConverters._  //asScala需要使用隐式转换


/**
  * Created by Daxin on 2017/4/16.
  */
object Broadcast {

  def main(args: Array[String]) {


    val env = ExecutionEnvironment.getExecutionEnvironment

    val toBroadcast = env.fromElements(1, 2, 3)

    val data = env.fromElements("1", "2", "5")

    /**
      * 如下是RichMapFunction的注释:
      * Rich variant of the MapFunction. As a RichFunction, it gives access to
      * the RuntimeContext and provides setup and teardown methods:
      * RichFunction.open(org.apache.flink.configuration.Configuration) and RichFunction.close().
      * <br>RichMapFunction是MapFunction的变体,RichFunction可以访问运行时上下文(RuntimeContext)
      * 并提供开启和关闭方法
      * <br>
      */
    val result = data.map(new RichMapFunction[String, String]() {
      var broadcastSet: Traversable[Integer] = null

      override def open(config: Configuration): Unit = {
        // 3. Access the broadcasted DataSet as a Collection
        broadcastSet = getRuntimeContext().getBroadcastVariable[Integer]("broadcastSetName").asScala
      }

      def map(in: String): String = {
        //...
        if (broadcastSet.toList.contains(in.toInt))
          in //随便简单返回字符串
        else
          in + "  " + broadcastSet.toList.size + "   " + broadcastSet.toList.contains(in) + "   " + broadcastSet.toList(0).getClass //随便简单返回
      }
    }).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet

   

    result.print()
  }

}

由于广播的获取是根据名字的,所以需要注册和获取时候名字一致!关于广播完整实例可以参考:KMeans Algorithm



注意:

   由于广播变量保存在集群的每一个节点的内存中,因此广播变量不应该太大,对于简单的变量像标量值,可以使该变量或者参数成为函数闭包的一部分,或者使用org.apache.flink.api.scala.DataSet#withParameters方法通过Config传递。







版权声明:本文为Dax1n原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。