import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
/**
* Author itcast
* Date 2021/12/5 15:30
* Desc - 在服务器运维中,需要实时监控服务器机架的温度,如果一定时间内温度超过了一定阈值(100度),
* 且后一次上报的温度超过了前一次上报的温度,需要触发告警(温度持续升高中)
* <p>
* 1,100
* 2,101
* 1,102
* <p>
* 1,105
* 1,101
* 1,103
* 1,95
*/
public class TemperatureAlarmProcess {
//定义logger
private static final Logger logger = LoggerFactory.getLogger(TemperatureAlarmProcess.class);
public static void main(String[] args) throws Exception {
//初始化流计算运行环境,指定并行度为1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置事件时间属性
//接入socket数据源,获取数据
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
//将获取到的数据转换成tuple2<Integer,Integer>
SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapStream = source.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(String value) throws Exception {
String[] arr = value.split(",");
//返回
return Tuple2.of(
Integer.parseInt(arr[0]),
Integer.parseInt(arr[1])
);
}
});
//根据 f0进行分组
mapStream.keyBy(t -> t.f0)
//自定义ProcessFunction对象,继承 KeyedProcessFunction<Tuple, Tuple2<Integer, Integer>, String>抽象类
.process(new MyKeyedProcessFunction())
//打印输出
.printToErr();
//执行任务
env.execute();
}
/**
* 实现当前服务器id对应的温度告警信息的处理,涉及到触发器,告警信息,状态操作等
*/
private static class MyKeyedProcessFunction extends KeyedProcessFunction<Integer, Tuple2<Integer, Integer>, String> {
//初始化 ListState<Tuple2<机架id, 机架温度>>,保存上一次的温度的状态
ListState<Tuple2<Integer, Integer>> lasthighTemperatureState = null;
//定义触发警告定时器的时长和格式化为"yyyy-MM-dd HH:mm:ss.SSS"
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Tuple2<Integer, Integer> lastHighTemperature = Tuple2.of(0, 0);
//实现如下方法:
//1.open ,获取ListState Tuple2<Integer, Integer>
/**
* 状态管理,open方法获取状态,保存的是上一次的高温的温度信息
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
//获取上一次状态的温度
lasthighTemperatureState = getRuntimeContext().getListState(new ListStateDescriptor(
"highTemperatureState",
Types.TUPLE(Types.INT, Types.INT)
));
}
/**
* 处理每条数据,注册timer触发器,当满足当前处理时间+5s之后,就会将当前高温信息进行 onTimer触发
* 如果温度过高并满足 5s,触发 onTimer,告警信息,并收集告警数据
*
* @param value
* @param ctx
* @param out
* @throws Exception
*/
//2.processElement
@Override
public void processElement(Tuple2<Integer, Integer> value, Context ctx, Collector<String> out) throws Exception {
//将listState数据赋值给最后一次温度数据,如果没有赋值为Tupe2<0,0>
Iterable<Tuple2<Integer, Integer>> lastTemperatures = lasthighTemperatureState.get();
for (Tuple2<Integer, Integer> lastTemperature : lastTemperatures) {
if (lastTemperature != null) {
lastHighTemperature = lastTemperature;
} else {
lastHighTemperature = Tuple2.of(0, 0);
}
}
//如果温度>100 并且 大于状态温度,将数据保存到listState中,并注册个定时器,5s触发告警,收集温度高告警信息
int currentTemperature = value.f1;
long timerTimeStamp = 0L;
if (currentTemperature > 100 && currentTemperature > lastHighTemperature.f1) {
//将当前的最高温度保存到状态中
lasthighTemperatureState.add(value);
//定义一个触发器
timerTimeStamp = ctx.timerService().currentProcessingTime() + 5 * 1000L;
//注册触发器
ctx.timerService().registerProcessingTimeTimer(timerTimeStamp);
//提示当前触发器执行的开始时间,上次温度的信息,本次温度的信息,打印输出
logger.warn("触发timer的时间:%s, 上一次温度是: %s ,本次温度是:%s", sdf.format(timerTimeStamp)
, lastHighTemperature,
currentTemperature);
System.out.println("触发timer的时间:" + sdf.format(timerTimeStamp) + ", 上一次温度是: " + lastHighTemperature.f1 + ",本次温度是:" + currentTemperature);
out.collect("触发高温报警");
} else {
//否则 清空状态温度,并取消定时器 ctx.timerService().deleteProcessingTimeTimer(timeTS);
ctx.timerService().deleteProcessingTimeTimer(timerTimeStamp);
lasthighTemperatureState.clear();
}
}
//3.onTimer 异步回调函数
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//获取状态中数据size
Iterable<Tuple2<Integer, Integer>> highList = lasthighTemperatureState.get();
int size = Lists.newArrayList(highList).size();
//如果大于 1 触发告警,并收集告警信息
if (size >= 1) {
logger.warn("当前的温度过高,告警!");
out.collect("当前的温度过高,告警!");
//清空历史高温的数据
lasthighTemperatureState.clear();
}
}
}
}
版权声明:本文为qq_40870024原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。