Broadcast 广播变量可以理解为是一个公共的共享变量,我们可以把一个数据集广播出去,然后不同的任务在节点上都能够获取到,这个数据在每个节点上只会存在一份,由于变量是存在内存中,这样便可占用很少内存,提高运行效率。(但是建议数据在一个g以内的数据进行广播)
下面着重讲解下我在实时处理数据时用到的广播流BroadcastStream
1.申请状态描述器
MapStateDescriptor<String, Map> stateSpaceUpAndDown = new MapStateDescriptor<>("stateSpaceUpAndDown", String.class, Map.class);
2.获取需要广播的数据,并进行处理
FlinkKafkaConsumer09<String> limitConsumer = new FlinkKafkaConsumer09<String>(upDownLimitTopic, new SimpleStringSchema(), properties);
DataStreamSource<String> inputIndiRela = env.addSource(limitConsumer);
SingleOutputStreamOperator<HashMap<String, Tuple2<BigDecimal, BigDecimal>>> upAndDownLimit = inputIndiRela.map(new MapFunction<String, HashMap<String, Tuple2<BigDecimal, BigDecimal>>>() {
@Override
public HashMap<String, Tuple2<BigDecimal, BigDecimal>> map(String value) throws Exception {
HashMap<String, Tuple2<BigDecimal, BigDecimal>> exMap = new HashMap<>();
//将kafka消费的数据按“,”进行切分
String[] split = value.split(",");
String k = split[0] + "." + split[1];
Tuple2<BigDecimal, BigDecimal> v = new Tuple2<>();
//上限
BigDecimal up = new BigDecimal(split[2]);
//下限
BigDecimal down = new BigDecimal(split[3]);
v.setField(up, 0);
v.setField(down, 1);
exMap.put(k, v);
return exMap;
}
});
3.把数据注册成广播流
BroadcastStream<HashMap<String, Tuple2<BigDecimal, BigDecimal>>> broadcastStreamUpAndDown = upAndDownLimit.broadcast(stateSpaceUpAndDown);
4.采集的数据流与广播流进行融合计算
//采集流
DataStreamSource<String> dataCollect= env.addSource(collectDataConsumer);
//计算
SingleOutputStreamOperator<Tuple3<String, Double, Long>> allUpAndDownStream = dataCollect.connect(broadcastStreamUpAndDown).process(new BroadcastProcessFunction<Tuple3<String, Double, Long>,
HashMap<String, Tuple2<BigDecimal, BigDecimal>>, Tuple3<String, Double, Long>>() {
HashMap<String, Object> map = new HashMap<>();
@Override
public void processElement(Tuple3<String, Double, Long> tuple3, ReadOnlyContext ctx, Collector<Tuple3<String, Double, Long>> out) throws Exception {
ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(stateSpaceUpAndDown);
//获取内存状态描述器k="key"的值
HashMap<String, Tuple2<BigDecimal, BigDecimal>> broadData = (HashMap<String, Tuple2<BigDecimal, BigDecimal>>) broadcastState.get("key");
String key = tuple3.f0;
Double value = tuple3.f1;
if (!key.equals("") && key != null && !Objects.isNull(broadData)) {
Double upLimit = 0.0;
Double downLimit = 0.0;
try {
upLimit = broadData.get(key).f0.doubleValue();
downLimit = broadData.get(key).f1.doubleValue();
} catch (NullPointerException e) {
}
if (value > upLimit || value < downLimit) {
tuple3.setFields(tuple3.f0, 0.0, tuple3.f2);
tuple3.setFields(tuple3.f0 + "_", tuple3.f1, tuple3.f2);
}
}
out.collect(tuple3);
}
@Override
public void processBroadcastElement(HashMap<String, Tuple2<BigDecimal, BigDecimal>> value, Context ctx, Collector<Tuple3<String, Double, Long>> out) throws Exception {
//将广播流的数据发送给map结构的状态描述器
BroadcastState state = ctx.getBroadcastState(stateSpaceUpAndDown);
Set<String> key = value.keySet();
for (String k : key) {
map.put(k, value.get(k));
}
state.put("key", map);
}
}).setParallelism(24);
大致过程分为以下几步:
1.申请状态描述器;
2.广播处理后的数据,即得到广播流;
3.连接采集流与广播流进行数据处理。
注意:
如果数据发送的数据需要是集合,那么这个集合要保证线程安全。
版权声明:本文为weixin_42045791原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。