最近在研究FLINK CEP SQL,直接上代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 运行环境常规设置
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
// 创建表运行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
DataStream<Event> stream = env
// Source
.readTextFile("D:\\eclipse_workspace\\test_practise\\FlinkProjectTwo\\src\\main\\resources\\input\\data4.log")
// map
.map(new MapFunction<String, Event>() {
private static final long serialVersionUID = 3358095967152295546L;
@Override
public Event map(String s) throws Exception {
// System.out.println("s:"+s);
String[] arr = s.split(",");
String id = arr[0];
long etime = Long.parseLong(arr[1]);
String ip1 = arr[2];
String ip2 = arr[3];
int type = Integer.parseInt(arr[4]);
String code = arr[5];
// System.out.println("(" + id + ", " + rowtime + ", " + vc + ", " + ", " + vl + ")");
Event event = new Event(id, etime, ip1, ip2, type, code) ;
System.out.println("event -->"+event);
return event;
}
})
// 过滤
// .filter(r -> r.id.equals("ws_001"))
// 水位线
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
private static final long serialVersionUID = -5662153360266536368L;
@Override
public long extractTimestamp(Event wEvent, long l) {
return wEvent.getEtime();
}
})
)
;
// 创建表视图
tableEnv.createTemporaryView("elog", stream,$("id"), $("ip1"), $("ip2"), $("type"), $("code"), $("etime").rowtime().as("ts"));
// tableEnv.createTemporaryView("elog", stream);
tableEnv.executeSql("DESCRIBE elog").print();//注册自定义方法,该方法是将匹配到的事件id收集成逗号分隔的字符串,方便入库存储溯源
tableEnv.registerFunction("aggr_ids", new AggrIdFunction());代码说明:其中,A和B之间是严格相邻的,若使用followedBy ,就是有且只能有一条不同或相同事件间隔(即使是符合A条件的事件,在匹配到B后,结果也会少一条匹配到A事件的最后一条,很是怪异)。但是我们实际需求是想AB之间可以出现多条其他事件,这个目前我还是没有想到办法,如果有好办法的小伙伴可以一起留言探讨。
DataStream<NetAtt> ds = tableEnv.toAppendStream(result, NetAtt.class);
result = tableEnv.sqlQuery("SELECT * " +
"FROM elog " +
" MATCH_RECOGNIZE( " +
// " PARTITION BY id " +
" ORDER BY ts " +
" MEASURES " +
" LAST(A.id) AS eid, " +
" aggr_ids(A.id) ||','|| aggr_ids(B.id) AS ids, " +
// " LAST(A.id) AS ids, " +
" A.ip1 AS ip1, " +
" B.ip2 AS ip2, " +
" 'CEP' AS eName," +
" LAST(B.ts) AS eTime " +
" ONE ROW PER MATCH " +
" AFTER MATCH SKIP PAST LAST ROW " +
" PATTERN (A{2,}? followedByAny B) WITHIN INTERVAL '10' SECOND" +
" DEFINE " +
" A AS A.type = 13002 AND LAST(A.ip2) = FIRST(A.ip2)," +
" B AS A.ip2 = B.ip1 AND B.type = 13003 " +
" ) " +
"");
ds = tableEnv.toAppendStream(result, NetAtt.class);
ds.print();
// 执行
env.execute("Flink Cep SQL");自定义聚合函数
import org.apache.flink.table.functions.AggregateFunction;
public class AggrIdFunction extends AggregateFunction<String, IdsBean>{
private static final long serialVersionUID = 5370314072313101269L;
@Override
public String getValue(IdsBean idsBean) {
// TODO Auto-generated method stub
return idsBean.ids.substring(1,idsBean.ids.length());
}
@Override
public IdsBean createAccumulator() {
// TODO Auto-generated method stub
return new IdsBean();
}
public void accumulate(IdsBean idsBean, String iValue) {
idsBean.ids = idsBean.ids+","+iValue ;
}
public void resetAccumulator(IdsBean idsBean) {
idsBean.ids = "";
}
}
测试数据:
ws_001,1577844001,127.0.0.1,127.0.0.2,13002,ceshi
ws_002,1577844002,127.0.0.1,127.0.0.2,13002,ceshi
ws_003,1577844015,127.0.0.1,127.0.0.2,13002,ceshi
ws_010,1577844016,127.0.0.1,127.0.0.2,13002,ceshi
ws_004,1577844017,127.0.0.1,127.0.0.2,13002,ceshi
ws_005,1577844021,127.0.0.2,127.0.0.3,13003,ceshi
ws_006,1577844022,127.0.0.1,127.0.0.2,13002,ceshi
ws_007,1577844023,127.0.0.1,127.0.0.2,13002,ceshi
ws_008,1577844024,127.0.0.1,127.0.0.2,13002,ceshi
ws_009,1577844025,127.0.0.2,127.0.0.3,13003,ceshi执行结果:
NetAtt [eid=ws_010, ids=ws_001,ws_002,ws_003,ws_010,ws_005, ip1=127.0.0.1, ip2=127.0.0.3, eName=CEP, eTime=1970-01-19 06:17:24.021]
NetAtt [eid=ws_007, ids=ws_006,ws_007,ws_009, ip1=127.0.0.1, ip2=127.0.0.3, eName=CEP, eTime=1970-01-19 06:17:24.025]测试数据修改如下:
ws_001,1577844001,127.0.0.1,127.0.0.2,13002,ceshi
ws_002,1577844002,127.0.0.1,127.0.0.2,13002,ceshi
ws_003,1577844015,127.0.0.1,127.0.0.2,13002,ceshi
ws_010,1577844016,127.0.0.1,127.0.0.2,13004,ceshi
ws_004,1577844017,127.0.0.1,127.0.0.2,13005,ceshi
ws_005,1577844021,127.0.0.2,127.0.0.3,13003,ceshi
ws_006,1577844022,127.0.0.1,127.0.0.2,13002,ceshi
ws_007,1577844023,127.0.0.1,127.0.0.2,13002,ceshi
ws_008,1577844024,127.0.0.1,127.0.0.2,13002,ceshi
ws_009,1577844025,127.0.0.2,127.0.0.3,13003,ceshi则执行结果变成:
NetAtt [eid=ws_007, ids=ws_006,ws_007,ws_009, ip1=127.0.0.1, ip2=127.0.0.3, eName=CEP, eTime=1970-01-19 06:17:24.025]
版权声明:本文为gaoshiliang原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。