Flink的状态机制
状态类型
- managed State 和 raw state
managed state: 由flink runtime托管,状态是自动存储,自动恢复的,flink在存储管理和持久化上做了一些优化。横向扩展时候,状态能够自动重新分布到多个并行实例上。flink提供常用的数据结构,ListState,MapState
raw state: 开发者自己管理,以字节数组存储,用户自定义算子 - keyed State: 是keyedstream上的状态,每个key对应自己的状态,是一种特殊的operator state
operator State:可以用在所有算子上,每个slot里面的算子实例共享一个状态,流入这个滋任务和数据可以访问和更新这个状态
两种算子都是基于本地的,即每个算子子任务维护者这个算子子任务对应的状态存储,算子子任务之间不能互相访问
代码实现
keyedStream 需要 在实现算子房中继承rich的方法,比如说不用flatmapfunction,而用richFlatMapFunction
package com.lagou.state;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StateDemo {
public static void main(String[] args) throws Exception {
//(1,3)(1,5)(1,7)(1,4)(1,2)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000);
DataStreamSource<String> data = env.socketTextStream("hdp-1", 7777);
SingleOutputStreamOperator<Tuple2<Long, Long>> maped = data.map(new MapFunction<String, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple2<Long, Long>(Long.valueOf(split[0]), Long.valueOf(split[1]));
}
});
// DataStreamSource<Tuple2<Long,Long>> data = env.fromElements(new Tuple2(1l, 3l), new Tuple2(1l, 5l), new Tuple2(1l, 7l), new Tuple2(1l, 4l), new Tuple2(1l, 2l));
KeyedStream<Tuple2<Long,Long>, Long> keyed = maped.keyBy(value -> value.f0);
//按照key分组策略,对流式数据调用状态化处理
SingleOutputStreamOperator<Tuple2<Long, Long>> flatMaped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
ValueState<Tuple2<Long, Long>> sumState;
@Override
public void open(Configuration parameters) throws Exception {
//在open方法中做出State
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
}),
Tuple2.of(0L, 0L)
);
sumState = getRuntimeContext().getState(descriptor);
super.open(parameters);
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
//在flatMap方法中,更新State
Tuple2<Long, Long> currentSum = sumState.value();
currentSum.f0 += 1;
currentSum.f1 += value.f1;
sumState.update(currentSum);
if (currentSum.f0 == 2) {
long avarage = currentSum.f1 / currentSum.f0;
out.collect(new Tuple2<>(value.f0, avarage));
sumState.clear();
}
}
});
flatMaped.print();
flatMaped.addSink(new OperaterStateDemo(2));
env.execute();
}
}
operator state 要通过继承checkpointedfunction 或者listcheckpointedfunction(范围小)
checkpointedfunction 重写里面的快照snapshotstate方法,以及statede1初始化方法initializeState
package com.lagou.state;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.List;
/**
* 1、在Flink中,做出OperatorState有两种方式:1、实现CheckpointedFunction接口 2、实现ListCheckPointed
* 2、两个方法:initializeState/snapshotState
* initializeState:每一个Function在最开始的实例化的时候调用,方法内,实例化状态
* snapshotState:每次checkpoint的时候被调用,将操作的最新数据放到最新的检查点中
* 3、invoke:
* 每来一个数据调用一次,把所有的到来的数据都放到缓存器中。目的是为了checkpoint的时候,从缓存器两种拿出数据
*
*/
public class OperaterStateDemo implements SinkFunction<Tuple2<Long,Long>>, CheckpointedFunction {
ListState<Tuple2<Long, Long>> operatorState;
int threshold;
private List<Tuple2<Long,Long>> bufferedElements;
public OperaterStateDemo(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
System.out.println("....snapshotState");
this.operatorState.clear();
for (Tuple2<Long,Long> element : bufferedElements) {
operatorState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("....initializeState");
//做出一个State
ListStateDescriptor<Tuple2<Long, Long>> operatarDemoDescriptor = new ListStateDescriptor<>(
"operatarDemo",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
})
);
operatorState = context.getOperatorStateStore().getListState(operatarDemoDescriptor);
if(context.isRestored()) {//说明程序异常中断...nonono...just datasource was wrong,程序仍在努力容错
for (Tuple2<Long,Long> element: operatorState.get()) {
bufferedElements.add(element);
}
System.out.println("....context.isRestored():true" + bufferedElements);
}
}
@Override
public void invoke(Tuple2<Long, Long> value, Context context) throws Exception {
System.out.println("---------invoke..........");
bufferedElements.add(value);
if(bufferedElements.size() == threshold) {
//
for(Tuple2<Long,Long> element : bufferedElements) {
System.out.println("...out:" + element);
}
bufferedElements.clear();
}
}
}
广播状态
将模式流广播到下游算子
//将模式流广播到下游的所有算子
MapStateDescriptor<Void, MyPattern> bcStateDescriptor = new MapStateDescriptor<> ("patterns", Types.VOID, Types.POJO(MyPattern.class));
BroadcastStream<MyPattern> broadcastPatterns = patterns.broadcast(bcStateDescriptor);
state的存储方式
更多详细解释可以参考:
https://zhuanlan.zhihu.com/p/104171679
版权声明:本文为weixin_38813363原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。