测流输出示例
/**
* 测流输出
* 本质上,就是将一个流中的数据打上标签,未打标签的数据叫主流,
* 打标签的数据流,叫侧流(非主流)
*
* 在Flink中,DataStream中的数据流,按照是否打标签,分为两类
* 1.侧流,就是打上标签的数据流,想要获取,就必须根据标签才能获取到
* 2.主流,就是未打标签的数据,直接操作DataStream就可以了
*/
public class SideOutputDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("linux01", 7777);
//需指定泛型的类型 或者new OutputTag的子类(即使用匿名内部类)
//OutputTag<Integer> oddTag = new OutputTag<Integer>("odd-data") {};
OutputTag<Integer> oddTag = new OutputTag<>("odd-data", Types.INT);
OutputTag<Integer> evenTag = new OutputTag<>("even-data",Types.INT);
OutputTag<String> strTag = new OutputTag<>("str-data",Types.STRING);
SingleOutputStreamOperator<String> mainStream = lines.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String line, Context ctx, Collector<String> out) throws Exception {
try {
int num = Integer.parseInt(line);
if (num % 2 == 0) {
//偶数
ctx.output(oddTag, num);
} else {
//奇数
ctx.output(evenTag, num);
}
} catch (NumberFormatException e) {
//字符串
ctx.output(strTag, line);
}
}
});
DataStream<Integer> oddStream = mainStream.getSideOutput(oddTag);
DataStream<String> strStream = mainStream.getSideOutput(strTag);
oddStream.print("偶数流");
strStream.print("字符流");
mainStream.print("主流");
env.execute();
}
}
使用侧流输出获取窗口中迟到的数据
/**
*
* 使用侧流输出获取窗口中迟到的数据(只有EventTime类型的窗口有迟到数据)
* 迟到的数据,直接输出,不会按照窗口触发
*/
public class GetWindowLateDataDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpointing
env.enableCheckpointing(10000);
//设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000));
DataStreamSource<String> lines = env.socketTextStream("linux01", 7777);
//获取Watermark
SingleOutputStreamOperator<String> linesWithWatermark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
}));
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = linesWithWatermark.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
if (line.startsWith("error")){
throw new RuntimeException("错误数据");
}
String[] fields = line.split(",");
return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
}
});
//按照单词keyBy
KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(tp -> tp.f0);
//划分窗口
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
//给迟到的数据打上标签
OutputTag<Tuple2<String, Integer>> lateDataTag = new OutputTag<Tuple2<String, Integer>>("late-data"){};
windowedStream.sideOutputLateData(lateDataTag);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowedStream.sum(1);
//获取迟到的数据
DataStream<Tuple2<String, Integer>> lateDataStream = result.getSideOutput(lateDataTag);
result.print();
lateDataStream.print("迟到的数据");
env.execute();
}
}
版权声明:本文为JinVijay原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。