Flink测流输出

测流输出示例

/**
 * 测流输出
 * 本质上,就是将一个流中的数据打上标签,未打标签的数据叫主流,
 * 打标签的数据流,叫侧流(非主流)
 *
 * 在Flink中,DataStream中的数据流,按照是否打标签,分为两类
 * 1.侧流,就是打上标签的数据流,想要获取,就必须根据标签才能获取到
 * 2.主流,就是未打标签的数据,直接操作DataStream就可以了
 */
public class SideOutputDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lines = env.socketTextStream("linux01", 7777);
        //需指定泛型的类型  或者new OutputTag的子类(即使用匿名内部类)
        //OutputTag<Integer> oddTag = new OutputTag<Integer>("odd-data") {};
        OutputTag<Integer> oddTag = new OutputTag<>("odd-data", Types.INT);
        OutputTag<Integer> evenTag = new OutputTag<>("even-data",Types.INT);
        OutputTag<String> strTag = new OutputTag<>("str-data",Types.STRING);

        SingleOutputStreamOperator<String> mainStream = lines.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String line, Context ctx, Collector<String> out) throws Exception {
                try {
                    int num = Integer.parseInt(line);
                    if (num % 2 == 0) {
                        //偶数
                        ctx.output(oddTag, num);
                    } else {
                        //奇数
                        ctx.output(evenTag, num);
                    }
                } catch (NumberFormatException e) {
                    //字符串
                    ctx.output(strTag, line);
                }
            }
        });

        DataStream<Integer> oddStream = mainStream.getSideOutput(oddTag);
        DataStream<String> strStream = mainStream.getSideOutput(strTag);

        oddStream.print("偶数流");
        strStream.print("字符流");

        mainStream.print("主流");
        env.execute();
    }
}

使用侧流输出获取窗口中迟到的数据

/**
 *
 * 使用侧流输出获取窗口中迟到的数据(只有EventTime类型的窗口有迟到数据)
 * 迟到的数据,直接输出,不会按照窗口触发
 */
public class GetWindowLateDataDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //开启checkpointing
        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000));

        DataStreamSource<String> lines = env.socketTextStream("linux01", 7777);
        //获取Watermark
        SingleOutputStreamOperator<String> linesWithWatermark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {
            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = linesWithWatermark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                if (line.startsWith("error")){
                    throw new RuntimeException("错误数据");
                }
                String[] fields = line.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });
        //按照单词keyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(tp -> tp.f0);
        //划分窗口
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        //给迟到的数据打上标签
        OutputTag<Tuple2<String, Integer>> lateDataTag = new OutputTag<Tuple2<String, Integer>>("late-data"){};
        windowedStream.sideOutputLateData(lateDataTag);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowedStream.sum(1);
        //获取迟到的数据
        DataStream<Tuple2<String, Integer>> lateDataStream = result.getSideOutput(lateDataTag);

        result.print();
        lateDataStream.print("迟到的数据");
        env.execute();


    }
}


版权声明:本文为JinVijay原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。