Flink入门(5)--Flink的state机制

状态类型

  1. managed State 和 raw state
    managed state: 由flink runtime托管,状态是自动存储,自动恢复的,flink在存储管理和持久化上做了一些优化。横向扩展时候,状态能够自动重新分布到多个并行实例上。flink提供常用的数据结构,ListState,MapState
    raw state: 开发者自己管理,以字节数组存储,用户自定义算子
  2. keyed State: 是keyedstream上的状态,每个key对应自己的状态,是一种特殊的operator state
    operator State:可以用在所有算子上,每个slot里面的算子实例共享一个状态,流入这个滋任务和数据可以访问和更新这个状态
    两种算子都是基于本地的,即每个算子子任务维护者这个算子子任务对应的状态存储,算子子任务之间不能互相访问

代码实现

keyedStream 需要 在实现算子房中继承rich的方法,比如说不用flatmapfunction,而用richFlatMapFunction

package com.lagou.state;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StateDemo {
    public static void main(String[] args) throws Exception {
        //(1,3)(1,5)(1,7)(1,4)(1,2)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000);
        DataStreamSource<String> data = env.socketTextStream("hdp-1", 7777);
        SingleOutputStreamOperator<Tuple2<Long, Long>> maped = data.map(new MapFunction<String, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(String value) throws Exception {
                String[] split = value.split(",");
                return new Tuple2<Long, Long>(Long.valueOf(split[0]), Long.valueOf(split[1]));
            }
        });
//        DataStreamSource<Tuple2<Long,Long>> data = env.fromElements(new Tuple2(1l, 3l), new Tuple2(1l, 5l), new Tuple2(1l, 7l), new Tuple2(1l, 4l), new Tuple2(1l, 2l));

        KeyedStream<Tuple2<Long,Long>, Long> keyed = maped.keyBy(value -> value.f0);

        //按照key分组策略,对流式数据调用状态化处理
        SingleOutputStreamOperator<Tuple2<Long, Long>> flatMaped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
            ValueState<Tuple2<Long, Long>> sumState;

            @Override
            public void open(Configuration parameters) throws Exception {
                //在open方法中做出State
                ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
                        "average",
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
                        }),
                        Tuple2.of(0L, 0L)
                );

                sumState = getRuntimeContext().getState(descriptor);
                super.open(parameters);
            }

            @Override
            public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
                //在flatMap方法中,更新State
                Tuple2<Long, Long> currentSum = sumState.value();

                currentSum.f0 += 1;
                currentSum.f1 += value.f1;

                sumState.update(currentSum);

                if (currentSum.f0 == 2) {
                    long avarage = currentSum.f1 / currentSum.f0;
                    out.collect(new Tuple2<>(value.f0, avarage));
                    sumState.clear();
                }

            }
        });

        flatMaped.print();
        flatMaped.addSink(new OperaterStateDemo(2));

        env.execute();
    }
}

operator state 要通过继承checkpointedfunction 或者listcheckpointedfunction(范围小)
checkpointedfunction 重写里面的快照snapshotstate方法,以及statede1初始化方法initializeState

package com.lagou.state;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * 1、在Flink中,做出OperatorState有两种方式:1、实现CheckpointedFunction接口  2、实现ListCheckPointed
 * 2、两个方法:initializeState/snapshotState
 * initializeState:每一个Function在最开始的实例化的时候调用,方法内,实例化状态
 * snapshotState:每次checkpoint的时候被调用,将操作的最新数据放到最新的检查点中
 * 3、invoke:
 * 每来一个数据调用一次,把所有的到来的数据都放到缓存器中。目的是为了checkpoint的时候,从缓存器两种拿出数据
 *
 */
public class OperaterStateDemo implements SinkFunction<Tuple2<Long,Long>>, CheckpointedFunction {
    ListState<Tuple2<Long, Long>> operatorState;
    int threshold;

    private List<Tuple2<Long,Long>> bufferedElements;

    public OperaterStateDemo(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        System.out.println("....snapshotState");
        this.operatorState.clear();
        for (Tuple2<Long,Long> element : bufferedElements) {
            operatorState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        System.out.println("....initializeState");
        //做出一个State
        ListStateDescriptor<Tuple2<Long, Long>> operatarDemoDescriptor = new ListStateDescriptor<>(
                "operatarDemo",
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
                })
        );
        operatorState = context.getOperatorStateStore().getListState(operatarDemoDescriptor);
        if(context.isRestored()) {//说明程序异常中断...nonono...just datasource was wrong,程序仍在努力容错
            for (Tuple2<Long,Long> element: operatorState.get()) {
                bufferedElements.add(element);
            }
            System.out.println("....context.isRestored():true" + bufferedElements);
        }

    }

    @Override
    public void invoke(Tuple2<Long, Long> value, Context context) throws Exception {
        System.out.println("---------invoke..........");
        bufferedElements.add(value);

        if(bufferedElements.size() == threshold) {
            //
            for(Tuple2<Long,Long> element : bufferedElements) {
                System.out.println("...out:" + element);
            }
            bufferedElements.clear();
        }
    }
}

广播状态

将模式流广播到下游算子

//将模式流广播到下游的所有算子 
MapStateDescriptor<Void, MyPattern> bcStateDescriptor = new MapStateDescriptor<> ("patterns", Types.VOID, Types.POJO(MyPattern.class)); 
BroadcastStream<MyPattern> broadcastPatterns = patterns.broadcast(bcStateDescriptor);

state的存储方式

在这里插入图片描述

更多详细解释可以参考:
https://zhuanlan.zhihu.com/p/104171679


版权声明:本文为weixin_38813363原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。