Flink实现高温预警


import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;

/**
 * Author itcast
 * Date 2021/12/5 15:30
 * Desc - 在服务器运维中,需要实时监控服务器机架的温度,如果一定时间内温度超过了一定阈值(100度),
 * 且后一次上报的温度超过了前一次上报的温度,需要触发告警(温度持续升高中)
 * <p>
 * 1,100
 * 2,101
 * 1,102
 * <p>
 * 1,105
 * 1,101
 * 1,103
 * 1,95
 */
public class TemperatureAlarmProcess {
    //定义logger
    private static final Logger logger = LoggerFactory.getLogger(TemperatureAlarmProcess.class);

    public static void main(String[] args) throws Exception {
        //初始化流计算运行环境,指定并行度为1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //设置事件时间属性
        //接入socket数据源,获取数据
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //将获取到的数据转换成tuple2<Integer,Integer>
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapStream = source.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(String value) throws Exception {
                String[] arr = value.split(",");
                //返回
                return Tuple2.of(
                        Integer.parseInt(arr[0]),
                        Integer.parseInt(arr[1])
                );
            }
        });
        //根据 f0进行分组
        mapStream.keyBy(t -> t.f0)
                //自定义ProcessFunction对象,继承 KeyedProcessFunction<Tuple, Tuple2<Integer, Integer>, String>抽象类
                .process(new MyKeyedProcessFunction())
                //打印输出
                .printToErr();
        //执行任务
        env.execute();
    }

    /**
     * 实现当前服务器id对应的温度告警信息的处理,涉及到触发器,告警信息,状态操作等
     */
    private static class MyKeyedProcessFunction extends KeyedProcessFunction<Integer, Tuple2<Integer, Integer>, String> {
        //初始化 ListState<Tuple2<机架id, 机架温度>>,保存上一次的温度的状态
        ListState<Tuple2<Integer, Integer>> lasthighTemperatureState = null;
        //定义触发警告定时器的时长和格式化为"yyyy-MM-dd HH:mm:ss.SSS"
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        Tuple2<Integer, Integer> lastHighTemperature = Tuple2.of(0, 0);

        //实现如下方法:
        //1.open ,获取ListState Tuple2<Integer, Integer>
        /**
         * 状态管理,open方法获取状态,保存的是上一次的高温的温度信息
         *
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            //获取上一次状态的温度
            lasthighTemperatureState = getRuntimeContext().getListState(new ListStateDescriptor(
                    "highTemperatureState",
                    Types.TUPLE(Types.INT, Types.INT)
            ));
        }

        /**
         * 处理每条数据,注册timer触发器,当满足当前处理时间+5s之后,就会将当前高温信息进行 onTimer触发
         * 如果温度过高并满足 5s,触发 onTimer,告警信息,并收集告警数据
         *
         * @param value
         * @param ctx
         * @param out
         * @throws Exception
         */
        //2.processElement
        @Override
        public void processElement(Tuple2<Integer, Integer> value, Context ctx, Collector<String> out) throws Exception {
            //将listState数据赋值给最后一次温度数据,如果没有赋值为Tupe2<0,0>
            Iterable<Tuple2<Integer, Integer>> lastTemperatures = lasthighTemperatureState.get();
            for (Tuple2<Integer, Integer> lastTemperature : lastTemperatures) {
                if (lastTemperature != null) {
                    lastHighTemperature = lastTemperature;
                } else {
                    lastHighTemperature = Tuple2.of(0, 0);
                }
            }
            //如果温度>100 并且 大于状态温度,将数据保存到listState中,并注册个定时器,5s触发告警,收集温度高告警信息
            int currentTemperature = value.f1;
            long timerTimeStamp = 0L;
            if (currentTemperature > 100 && currentTemperature > lastHighTemperature.f1) {
                //将当前的最高温度保存到状态中
                lasthighTemperatureState.add(value);
                //定义一个触发器
                timerTimeStamp = ctx.timerService().currentProcessingTime() + 5 * 1000L;
                //注册触发器
                ctx.timerService().registerProcessingTimeTimer(timerTimeStamp);
                //提示当前触发器执行的开始时间,上次温度的信息,本次温度的信息,打印输出
                logger.warn("触发timer的时间:%s, 上一次温度是: %s ,本次温度是:%s", sdf.format(timerTimeStamp)
                        , lastHighTemperature,
                        currentTemperature);
                System.out.println("触发timer的时间:" + sdf.format(timerTimeStamp) + ", 上一次温度是: " + lastHighTemperature.f1 + ",本次温度是:" + currentTemperature);
                out.collect("触发高温报警");
            } else {
                //否则 清空状态温度,并取消定时器 ctx.timerService().deleteProcessingTimeTimer(timeTS);
                ctx.timerService().deleteProcessingTimeTimer(timerTimeStamp);
                lasthighTemperatureState.clear();
            }
        }

        //3.onTimer 异步回调函数
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            //获取状态中数据size
            Iterable<Tuple2<Integer, Integer>> highList = lasthighTemperatureState.get();
            int size = Lists.newArrayList(highList).size();
            //如果大于 1 触发告警,并收集告警信息
            if (size >= 1) {
                logger.warn("当前的温度过高,告警!");
                out.collect("当前的温度过高,告警!");
                //清空历史高温的数据
                lasthighTemperatureState.clear();
            }
        }
    }
}


版权声明:本文为qq_40870024原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。