如何使用Flink一次性计算最大值和最小值

如何在某个时间间隔内统计某个时间窗口的数据的最大值和最小值,要求是能够一次性查找出来,不通过两次。

    @Override
    public DataStream<StructuredRecord> transform(FlinkExecutionPluginContext context, DataStream<Object> input) {
        return input
                .flatMap(new FlatMapFunction<Object, Tuple2<String, String>>() {
                    @Override
                    public void flatMap(Object o, Collector<Tuple2<String, String>> collector) {
                        StructuredRecord structuredRecord = (StructuredRecord) o;
                        List<Schema.Field> fields = structuredRecord.getSchema().getFields();
                        assert fields != null;
                        Object value = structuredRecord.get(fields.get(0).getName());
                        String[] splits = String.valueOf(value).split(System.lineSeparator());
                        for (String name : splits) {
                            collector.collect(new Tuple2<>(name, name));
                        }
                    }
                })
                // 滑动窗口,指定时间窗口大小为timeWindow秒,指定时间间隔为intervals秒
                .timeWindowAll(Time.seconds(Integer.parseInt(config.timeWindow)),
                        Time.seconds(Integer.parseInt(config.intervals)))
                .max(0)
                .timeWindowAll(Time.seconds(Integer.parseInt(config.timeWindow)),
                        Time.seconds(Integer.parseInt(config.intervals)))
                .min(1)
                .map(new MapFunction<Tuple2<String, String>, StructuredRecord>() {
                    @Override
                    public StructuredRecord map(Tuple2<String, String> tuple) {
                        StructuredRecord.Builder builder = StructuredRecord.builder(FlinkMaxMinCompute.SCHEMA);
                        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                        String time = f.format(LocalDateTime.now());
                        builder.set("message",
                                "(" + time + ") max: " + tuple.getField(0) + ", min: " + tuple.getField(1));
                        return builder.build();
                    }
                });
    }

在前面通过 .timeWindowAll().max() 查找最大值后,再 .timeWindowAll().min() 的方式查找最小值。


版权声明:本文为weixin_50337833原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。