flink广播变量的应用讲解

Broadcast 广播变量可以理解为是一个公共的共享变量,我们可以把一个数据集广播出去,然后不同的任务在节点上都能够获取到,这个数据在每个节点上只会存在一份,由于变量是存在内存中,这样便可占用很少内存,提高运行效率。(但是建议数据在一个g以内的数据进行广播)
下面着重讲解下我在实时处理数据时用到的广播流BroadcastStream

1.申请状态描述器

MapStateDescriptor<String, Map> stateSpaceUpAndDown = new MapStateDescriptor<>("stateSpaceUpAndDown", String.class, Map.class);

2.获取需要广播的数据,并进行处理

 FlinkKafkaConsumer09<String> limitConsumer = new FlinkKafkaConsumer09<String>(upDownLimitTopic, new SimpleStringSchema(), properties);
 DataStreamSource<String> inputIndiRela = env.addSource(limitConsumer);
 SingleOutputStreamOperator<HashMap<String, Tuple2<BigDecimal, BigDecimal>>> upAndDownLimit = inputIndiRela.map(new MapFunction<String, HashMap<String, Tuple2<BigDecimal, BigDecimal>>>() {

            @Override
            public HashMap<String, Tuple2<BigDecimal, BigDecimal>> map(String value) throws Exception {

                HashMap<String, Tuple2<BigDecimal, BigDecimal>> exMap = new HashMap<>();
                //将kafka消费的数据按“,”进行切分
                String[] split = value.split(",");
                String k = split[0] + "." + split[1];
                Tuple2<BigDecimal, BigDecimal> v = new Tuple2<>();
                //上限
                BigDecimal up = new BigDecimal(split[2]);
                //下限
                BigDecimal down = new BigDecimal(split[3]);

                v.setField(up, 0);
                v.setField(down, 1);
                exMap.put(k, v);
                return exMap;
            }
        });

3.把数据注册成广播流

BroadcastStream<HashMap<String, Tuple2<BigDecimal, BigDecimal>>> broadcastStreamUpAndDown = upAndDownLimit.broadcast(stateSpaceUpAndDown);

4.采集的数据流与广播流进行融合计算

//采集流
DataStreamSource<String> dataCollect= env.addSource(collectDataConsumer);
//计算
  SingleOutputStreamOperator<Tuple3<String, Double, Long>> allUpAndDownStream = dataCollect.connect(broadcastStreamUpAndDown).process(new BroadcastProcessFunction<Tuple3<String, Double, Long>,
                HashMap<String, Tuple2<BigDecimal, BigDecimal>>, Tuple3<String, Double, Long>>() {
            HashMap<String, Object> map = new HashMap<>();

            @Override
            public void processElement(Tuple3<String, Double, Long> tuple3, ReadOnlyContext ctx, Collector<Tuple3<String, Double, Long>> out) throws Exception {
             
                ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(stateSpaceUpAndDown);
                //获取内存状态描述器k="key"的值
                HashMap<String, Tuple2<BigDecimal, BigDecimal>> broadData = (HashMap<String, Tuple2<BigDecimal, BigDecimal>>) broadcastState.get("key");
                String key = tuple3.f0;
                Double value = tuple3.f1;
                if (!key.equals("") && key != null && !Objects.isNull(broadData)) {
                    Double upLimit = 0.0;
                    Double downLimit = 0.0;
                    try {
                        upLimit = broadData.get(key).f0.doubleValue();
                        downLimit = broadData.get(key).f1.doubleValue();
                    } catch (NullPointerException e) {
                    }
                    if (value > upLimit || value < downLimit) {
                        tuple3.setFields(tuple3.f0, 0.0, tuple3.f2);
                        tuple3.setFields(tuple3.f0 + "_", tuple3.f1, tuple3.f2);
                    }
                }
                out.collect(tuple3);
            }

            @Override
            public void processBroadcastElement(HashMap<String, Tuple2<BigDecimal, BigDecimal>> value, Context ctx, Collector<Tuple3<String, Double, Long>> out) throws Exception {
                //将广播流的数据发送给map结构的状态描述器
                BroadcastState state = ctx.getBroadcastState(stateSpaceUpAndDown);
                Set<String> key = value.keySet();
                for (String k : key) {
                    map.put(k, value.get(k));
                }
                state.put("key", map);
            }
        }).setParallelism(24);

大致过程分为以下几步:
1.申请状态描述器;
2.广播处理后的数据,即得到广播流;
3.连接采集流与广播流进行数据处理。

注意:
如果数据发送的数据需要是集合,那么这个集合要保证线程安全。


版权声明:本文为weixin_42045791原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。