在flink中有state可用用来记录工作状态,进行相关的数据计算,同时状态也是可以用来当做缓存使用的。
使用MapSatate进行数据缓存作为Redis和数据流的中间结果存储。
首先获取flink上下文
RuntimeContext runtimeContext = getRuntimeContext()
其次定义map描述器
//定义描述器
MapStateDescriptor<String, LbsInfo> lbsInfoState =
new MapStateDescriptor<>(
"lbsInfoState",
String.class, LbsInfo.class);
//设置TTL过期 10分钟会清理一次缓存数据
StateTtlConfig config = StateTtlConfig
.newBuilder(Time.seconds(10 * 60 * 1000))
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build();
//TTL配置
lbsInfoState.enableTimeToLive(config);
然后从上下文中获取定义好的state数据
MapState<String, HashMap> timeIntervalMap = runtimeContext.getMapState(lbsInfoState);
判断状态中是否有数据 没有则从redis中获取
有则直接取
if (null == timeIntervalMap || timeIntervalMap.size() == 0) {
String redisData = RedisUtil.get("data");
timeIntervalMap.put(key,redisData);
}
//开始处理拿到的缓存数据
当然针对业务逻辑还有更深入的处理。。。
版权声明:本文为weixin_42169168原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。