广播变量简介
在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变量Broadcast便是解决这种情况的。
我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份
适用范围
由于广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大
需求
示例数据
{“dt”:“2019-11-19 20:33:39”,“countryCode”:“TW”,“data”: [{“type”:“s1”,“score”:0.8,“level”:“D”},{“type”:“s2”,“score”:0.1,“level”:“B”}]} {“dt”:“2019-11-19 20:33:41”,“countryCode”:“KW”,“data”: [{“type”:“s2”,“score”:0.2,“level”:“A”},{“type”:“s1”,“score”:0.2,“level”:“D”}]} {“dt”:“2019-11-19 20:33:43”,“countryCode”:“HK”,“data”: [{“type”:“s5”,“score”:0.5,“level”:“C”},{“type”:“s2”,“score”:0.8,“level”:“B”}]} {“dt”:“2019-11-19 20:33:39”,“countryCode”:“TW”,“data”: [{“type”:“s1”,“score”:0.8,“level”:“D”},{“type”:“s2”,“score”:0.1,“level”:“B”}]}
输出结果
“dt”:“2019-11-19 20:33:39”,“countryCode”:“AREA_CT”,“type”:“s1”,“score”:0.8,“level”:“D” “dt”:“2019-11-19 20:33:39”,“countryCode”:“AREA_CT”,“type”:“s2”,“score”:0.1,“level”:"B
实现
public class Main {
private static final int RESTART_ATTEMPTS = 5;
private static final int RESTART_INTERVAL = 20;
private static Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置Stage策略
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
env.enableCheckpointing(5000L);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setCheckpointTimeout(100000L);
checkpointConfig.setFailOnCheckpointingErrors(true);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//测试环境不需要设置 backend
// FsStateBackend fsStateBackend = new FsStateBackend(CheckpointUtils.getCheckpointDir());
// env.setStateBackend(fsStateBackend);
// 延迟时间间隔
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
RESTART_ATTEMPTS, // 尝试重启次数
org.apache.flink.api.common.time.Time.of(RESTART_INTERVAL, TimeUnit.SECONDS)
));
//自定义source 生成数据
DataStreamSource<String> dataStreamSource = env.addSource(new DataSource());
//1、采用直接用redis的方式
// SingleOutputStreamOperator<OutData> outDataSingleOutputStreamOperator = dataStreamSource.flatMap(new SimpleFlatMapFunction());
//2.asynio
// SingleOutputStreamOperator<OutData> outDataSingleOutputStreamOperator =
// AsyncDataStream.unorderedWait(dataStreamSource, new SimpaleAsyncIoFunction(), 2000, TimeUnit.MILLISECONDS);
//
final MapStateDescriptor<String, String> broadcastDes = new MapStateDescriptor<>(
"broadcast",
String.class,
String.class );
BroadcastStream<String> broadcast = env.addSource(new BroadcastSourceFunction()).broadcast(broadcastDes);
SingleOutputStreamOperator<OutData> outDataSingleOutputStreamOperator = dataStreamSource.connect(broadcast).process(new BroadcastProcessFunction<String, String, OutData>() {
@Override
public void processElement(String s, ReadOnlyContext readOnlyContext, Collector<OutData> collector) throws Exception {
ReadOnlyBroadcastState<String, String> broadcastState = readOnlyContext.getBroadcastState(broadcastDes);
String broadcastState1 = broadcastState.get("broadcastState");
HashMap<String,String> data = JSONObject.parseObject(broadcastState1, HashMap.class);
OriginData originData = JSONObject.parseObject(s, OriginData.class);
String countryCode = originData.countryCode;
ArrayList<Data> datas = originData.data;
String dt = originData.dt;
String coutryCode = data.get(countryCode);
for (Data datum : datas) {
OutData of = OutData.of(dt, coutryCode, datum.type, datum.score, datum.level);
collector.collect(of);
}
}
@Override
public void processBroadcastElement(String s , Context context, Collector<OutData> collector) throws Exception {
BroadcastState<String, String> broadcastState = context.getBroadcastState(broadcastDes);
broadcastState.remove("broadcastState");
broadcastState.put("broadcastState",s);
}
});
SingleOutputStreamOperator<String> map = outDataSingleOutputStreamOperator.map(new MapFunction<OutData, String>() {
@Override
public String map(OutData outData) throws Exception {
return JSON.toJSONString(outData);
}
});
map.print();
env.execute();
}
static class SimpleFlatMapFunction extends RichFlatMapFunction<String,OutData>{
private transient ConcurrentHashMap<String, String> hashMap = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Jedis jedisCluster = RedisFactory.getJedisCluster();
ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0");
List<Map.Entry<String, String>> result = areas.getResult();
System.out.println("更新缓存");
hashMap = new ConcurrentHashMap<>();
for (Map.Entry<String, String> stringStringEntry : result) {
String key = stringStringEntry.getKey();
String[] split = stringStringEntry.getValue().split(",");
for (String s : split) {
hashMap.put(s, key);
}
}
jedisCluster.close();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("更新缓存");
Jedis jedisCluster = RedisFactory.getJedisCluster();
ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0");
List<Map.Entry<String, String>> result = areas.getResult();
hashMap = new ConcurrentHashMap<>();
for (Map.Entry<String, String> stringStringEntry : result) {
String key = stringStringEntry.getKey();
String[] split = stringStringEntry.getValue().split(",");
for (String s : split) {
hashMap.put(s, key);
}
}
jedisCluster.close();
}
}, 0, 3, TimeUnit.SECONDS);
}
@Override
public void flatMap(String s, Collector<OutData> collector) throws Exception {
OriginData originData = JSONObject.parseObject(s, OriginData.class);
String countryCode = originData.countryCode;
ArrayList<Data> data = originData.data;
String dt = originData.dt;
String coutryCode = hashMap.get(countryCode);
for (Data datum : data) {
OutData of = OutData.of(dt, coutryCode, datum.type, datum.score, datum.level);
collector.collect(of);
}
}
}
static class SimpaleAsyncIoFunction extends RichAsyncFunction<String,OutData> {
private transient RedisClient redisClient;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
super.open(parameters);
RedisOptions config = new RedisOptions();
config.setHost("hadoop01");
config.setPort(6379);
VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(10);
vo.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(vo);
redisClient = RedisClient.create(vertx, config);
}
@Override
public void close() throws Exception {
super.close();
super.close();
if(redisClient!=null){
redisClient.close(null);
}
}
@Override
public void asyncInvoke(String s, ResultFuture<OutData> resultFuture) throws Exception {
OriginData originData = JSONObject.parseObject(s, OriginData.class);
String countryCode = originData.countryCode;
redisClient.hscan("areas", "0", ScanOptions.NONE, new Handler<AsyncResult<JsonArray>>() {
@Override
public void handle(AsyncResult<JsonArray> result) {
if (result.succeeded()){
JsonArray result1 = result.result();
if (result1 == null){
resultFuture.complete(null);
return;
}
JsonArray jsonArray = result1.getJsonArray(1);
// ["AREA_US","US","AREA_CT","TW,HK","AREA_AR","PK,KW,SA,XX","AREA_IN","IN"]
HashMap<String,String> ss = new HashMap<>();
ArrayList<String> keys = new ArrayList<>();
ArrayList<String> values = new ArrayList<>();
for (int i = 0; i <jsonArray.size() ; i++) {
if (i % 2 == 0){
keys.add(jsonArray.getString(i));
}else {
values.add(jsonArray.getString(i));
}
}
for (int i = 0; i < keys.size(); i++) {
String s1 = keys.get(i);
String s2 = values.get(i);
String[] split = s2.split(",");
for (String s3 : split) {
ss.put(s3,s1);
}
}
String dt = originData.dt;
String country = ss.get(countryCode);
for (Data datum : originData.data) {
OutData outData = OutData.of(dt, country, datum.type, datum.score, datum.level);
resultFuture.complete(Collections.singleton(outData));
}
} else if(result.failed()){
resultFuture.complete(null);
return;
}
}
});
}
}
static class BroadcastSourceFunction extends RichSourceFunction<String>{
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true){
Jedis jedisCluster = RedisFactory.getJedisCluster();
ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0");
List<Map.Entry<String, String>> result = areas.getResult();
HashMap<String, String> hashMap = new HashMap<>();
for (Map.Entry<String, String> stringStringEntry : result) {
String key = stringStringEntry.getKey();
String[] split = stringStringEntry.getValue().split(",");
for (String s : split) {
hashMap.put(s,key);
}
}
sourceContext.collect(JSON.toJSONString(hashMap));
jedisCluster.close();
TimeUnit.SECONDS.sleep(3);
}
}
@Override
public void cancel() {
}
}
static class RedisFactory {
private static Jedis jedisCluster = null;
private RedisFactory() {
}
public static Jedis getJedisCluster() {
jedisCluster = new Jedis(new HostAndPort("hadoop01", Integer.parseInt("6379")));
return jedisCluster;
}
}
static class OriginData {
public String dt;
public String countryCode;
public ArrayList<Data> data;
public OriginData() {
}
public OriginData(String dt, String countryCode, ArrayList<Data> data) {
this.dt = dt;
this.countryCode = countryCode;
this.data = data;
}
public static OriginData of(String dt, String countryCode, ArrayList<Data> data) {
return new OriginData(dt, countryCode, data);
}
}
static class Data {
public String type;
public Double score;
public String level;
public Data() {
}
public Data(String type, Double score, String level) {
this.type = type;
this.score = score;
this.level = level;
}
public static Data of(String type, Double score, String level) {
return new Data(type, score, level);
}
}
static class OutData {
public String dt;
public String countryCode;
public String type;
public Double score;
public String level;
public OutData() {
}
public OutData(String dt, String countryCode, String type, Double score, String level) {
this.dt = dt;
this.countryCode = countryCode;
this.type = type;
this.score = score;
this.level = level;
}
public static OutData of(String dt, String countryCode, String type, Double score, String level) {
return new OutData(dt, countryCode, type, score, level);
}
}
static class DataSource extends RichSourceFunction<String> {
private static final String YYYYMMDDHHMMSS = "yyyy-MM-dd HH:mm:ss";
private static Random random = new Random();
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat(YYYYMMDDHHMMSS);
private static String[] countryCodes = {"US","TW","HK","PK","KW","SA","XX","IN"};
private static String[] users = {"s1","s2","s3","s4","s5","s6","s7","s8","s9","s10","s11","s12","s13","s14","s15","s16"};
private static String[] levels = {"A","B","C","D"};
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true){
int i = random.nextInt(4);
long time = System.currentTimeMillis()+ 1000*i;
String resDate = simpleDateFormat.format(time);
i = random.nextInt(users.length);
String user1 = users[i];
Double score1 = Double.valueOf(String.format("%.1f", random.nextDouble()));
String countCode1 = countryCodes[i%countryCodes.length];
String level1 = levels[i%levels.length];
i = random.nextInt(users.length);
String user2 = users[i];
String countCode2 = countCode1;
String level2 = levels[i%levels.length];
Double score2 = Double.valueOf(String.format("%.1f", random.nextDouble()));
Data data1 = Data.of(user1, score1, level1);
Data data2 = Data.of(user2, score2, level2);
ArrayList<Data> datas = new ArrayList<>();
datas.add(data1);
datas.add(data2);
OriginData originData = OriginData.of(resDate, countCode1, datas);
String s = JSON.toJSONString(originData);
sourceContext.collect(s);
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
}
}
}
`