前言
分析基于的版本是Flink 1.12.1 , Kafka 是 2.11 。 下面是在IDEA 里边直接依赖的包的截图 。

FlinkKafkaConsumer
Maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.1</version>
</dependency>
Flink Kafka Consumer 特性
Flink Kafka Consumer 直接依赖的关键 Class 是 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer , 这个类有两个方面的指责,一是作为Flink 框架支持的一种 Source, 要适配Source 接口的相关逻辑保证正确接入Flink,第二个是与Kafka 本身的Consumer 方式交互的逻辑 。 官方给出使用Flink Kafka Consumer 的简单例子 :
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
从官方给出的例子可以看出,使用 Flink Kafka Consumer 的三个要素
1、 Kafka 的Topic 或则会是 Topic的一个列表 (支持正则匹配的方式),表征从Kafka Cluster 哪里获取数据
2、 数据如何反序列化,与Kafka 自身的Consumer 使用模式类似,用户APP 需要负责将read 的bytes 还原成业务操作的对象,涉及到的接口就是 DeserializationSchema / KafkaDeserializationSchema
3、 Kafka Consumer的配置属性 ,如“bootstrap.servers”,“group.id” 必须存在,其他的控制属性如 poll超时时间,每次poll多少条等
引用自官方
- The topic name / list of topic names
- A DeserializationSchema / KafkaDeserializationSchema for deserializing the data from Kafka
- Properties for the Kafka consumer. The following properties are required:
- “bootstrap.servers” (comma separated list of Kafka brokers)
- “group.id” the id of the consumer group
Flink Kafka Consumer 相关源代码分析
入口类 FlinkKafkaConsumer,需要一个范型指定SOURCE 输出到 Flink Stream 中的Record 类型 T,这个T 是通过用户配置的 反序列化类实现的~
public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> FlinkKafkaConsumer的构造函数重载的形式,最终都是调用最后一个构造器完成对象的创建
public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props)public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props)public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)private FlinkKafkaConsumer( List<String> topics, Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
private FlinkKafkaConsumer( List<String> topics, Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) { super( topics, subscriptionPattern, deserializer, getLong( checkNotNull(props, "props"), KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED), !getBoolean(props, KEY_DISABLE_METRICS, false)); this.properties = props; setDeserializer(this.properties); // configure the polling timeout try { if (properties.containsKey(KEY_POLL_TIMEOUT)) { this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT)); } else { this.pollTimeout = DEFAULT_POLL_TIMEOUT; } } catch (Exception e) { throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e); } }
上边这个构造函数的核心流程,super 是调用父类的构造起,之后是将传入的properties赋值到该对象的成员变量 this.properties 。
比较有意思的是最后进行 deserialize 的接口就只是 KafkaDeserializationSchema,也就就算用户DeserializationSchema 传入接口的实现,也会被包装成KafkaDeserializationSchema,后便会对比一下两者的区别[1]
setDeserializer(this.properties); 这一句比较有意思,如下是检查properties 里边配置的 Kafka Clients 用的serde类,如过配置了会提示 Flink Kafka Connector 会忽略用户配置的类,采用org.apache.kafka.common.serialization.ByteArrayDeserializer,这个反序列化类其实就是什么也没有做,deserialize 返回 byte[] , Flink 意图很明显,“unpack message的事情不用Kafka Clients 来做了”。
/**
* Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
*
* @param props The Kafka properties to register the serializer in.
*/
private static void setDeserializer(Properties props) {
final String deSerName = ByteArrayDeserializer.class.getName();Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valDeSer != null && !valDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
接下来看一下FlinkKafkaConsumer的父类,
/** * Base constructor. * * @param topics fixed list of topics to subscribe to (null, if using topic pattern) * @param topicPattern the topic pattern to subscribe to (null, if using fixed topics) * @param deserializer The deserializer to turn raw byte messages into Java/Scala objects. * @param discoveryIntervalMillis the topic / partition discovery interval, in * milliseconds (0 if discovery is disabled). */ public FlinkKafkaConsumerBase( List<String> topics, Pattern topicPattern, KafkaDeserializationSchema<T> deserializer, long discoveryIntervalMillis, boolean useMetrics) { this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern); this.deserializer = checkNotNull(deserializer, "valueDeserializer"); checkArgument( discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >= 0, "Cannot define a negative value for the topic / partition discovery interval."); this.discoveryIntervalMillis = discoveryIntervalMillis; this.useMetrics = useMetrics; }
这个类是确定是要和Flink Source 机制取得联系了, 以Source Operator UDF的形式实现, 正常情况下我们自定义一个Flink 的Source 也是继承 SourceFunction或者 RichSourceFunction, 这里的RichParallelSourceFunction 可以就理解为 RichSourceFunction, Parallel 就是类继承体系里边的一个标记, Flink 这行框架运行时通过这个信息能做一些 specific 特意化的调整,这个没有太深入研究。其余的代码比较简单就是一些对象如KafkaTopicsDescriptor的创建,和控制参数的赋值。
到这里,在Flink的Client端执行完成 FlinkConsumer的构造。
接下来是在Flink Runtime 框架中进行初始化,因为此类FlinkKafkaConsumerBase继承的是类 RichSourceFunction,所以生命周期控制有open 和close 两个函数,以及运行source 流程的run函数
open ,这个函数比较长,就不一次性贴出来了
public void open(Configuration configuration) throws Exception { // determine the offset commit mode this.offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
首先确定 offsetCommitMode, 值是一个 enum 类型,取值有 DISABLED (不做kafka commit) ,ON_CHECKPOINTS (checkpoint 开启的时候随着checkpoint做commit),KAFKA_PERIODIC (checkpoint没有开启,同Kafka Clients的auto commit)
// create the partition discoverer this.partitionDiscoverer = createPartitionDiscoverer( topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); this.partitionDiscoverer.open();subscribedPartitionsToStartOffsets = new HashMap<>(); ## 调用 partitionDiscoverer 初始化探测 topic partition final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
接下来是 partition discover,这个特性是什么意思呢,官方有解释 叫Kafka Consumers Topic and Partition Discovery,中心思想是对于 Flink 运行过程中,kafka上topic partition的新增 , flink kafka 能够检测到并且消费它(从earliest)而不用重启Flink 作业。这里就是把 this.partitionDiscoverer 量当成一个功能对象使用,后便深入分析 KafkaPartitionDiscoverer 的实现 [2]
这里的allPartitions经过partitionDiscover 返回的是处理过的该SubTask真正处理Partitions集合,是应用待处理的Kafka topic partitions 的一个子集。

接下来是两个大的分支, 如果restoredState != null 表明是从检查点恢复作业导致的启动, 另外一个就是正常的启动 。 restoredState 的赋值是因为当前这个open 方法调用前会调用 checkpoint 相关的初始化,也就是CheckpointedFunction的initializeState 方法[3]。
先说如果restoredState存在,则需要恢复式启动
for (KafkaTopicPartition partition : allPartitions) { if (!restoredState.containsKey(partition)) { restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); } }
这里将从checkpoint state 恢复的 topic partition 作为正常的消费,那么通过 partitionDiscover的partition 就是属于动态发现的topic partition, flink 会从 “earliest” 的位置开始消费。
for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) { // seed the partition discoverer with the union state while filtering out // restored partitions that should not be subscribed by this subtask if (KafkaTopicPartitionAssigner.assign( restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks()) == getRuntimeContext().getIndexOfThisSubtask()){ subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue()); } }
subscribedPartitionsToStartOffsets 表示当前的 Kafka Source Operator instance 要消费的topic partition 列表分,需要使用到KafkaTopicPartitionAssigner.assign 函数进行分配。 这里输入 KafkaTopicPartition,NumberOfParallelSubtasks, 计算这个 KafkaTopic partition会分配给哪一个 “SubTask” , 用index 表示。 这里提及一下, RichSourceFunction在 open操作时的上下文中,能够提供这个算子物理实例的个数和当前的Source 是第几个实例 。分配算法很简单,对于一个topic中的所有partition ,就是一个Round Robin 方式分配 。具体代码如下:
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks; // here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the start index return (startIndex + partition.getPartition()) % numParallelSubtasks; }
if (filterRestoredPartitionsWithCurrentTopicsDescriptor) { subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> { if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) { LOG.warn( "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.", entry.getKey()); return true; } return false; }); }
当前的source subtask 已经确定了待消费的 TopicPartiton了, 如上接下来的代码就是通过 topic Descriptor 过滤掉不符合条件的 Topic Partitions 。至此,从checkpoint中恢复启动的 部分过程过程完成。
open 中的正常启动
首先是根据 StartupMode (SPECIFIC_OFFSETS,TIMESTAMP,EARLIEST,LATEST,GROUP_OFFSETS ) 来确定当前的 subTask 消费的 Kafka 的 Topic Partitions ,分三种情况
第一种情况, SPECIFIC_OFFSETS 应用指定每一个TopicPartition从哪个位置开始消费
case SPECIFIC_OFFSETS: if (specificStartupOffsets == null) { throw new IllegalStateException( "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + ", but no specific offsets were specified."); } for (KafkaTopicPartition seedPartition : allPartitions) { Long specificOffset = specificStartupOffsets.get(seedPartition); if (specificOffset != null) { // since the specified offsets represent the next record to read, we subtract // it by one so that the initial state of the consumer will be correct subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1); } else { // default to group offset behaviour if the user-provided specific offsets // do not contain a value for this partition subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); } }
要保证 应用配置的specificStartupOffsets 非空,然后遍历partitionDiscover 首次探测的 seed TopicPartition ,如果应用自定义配置了该 Partition的 offset 则 使用它,否则默认用 zk/broker 上的commit offset
第二种情况,根据用户配置的 timestamp确定消费的offset
case TIMESTAMP: if (startupOffsetsTimestamp == null) { throw new IllegalStateException( "Startup mode for the consumer set to " + StartupMode.TIMESTAMP + ", but no startup timestamp was specified."); } for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) { subscribedPartitionsToStartOffsets.put( partitionToOffset.getKey(), (partitionToOffset.getValue() == null) // if an offset cannot be retrieved for a partition with the given timestamp, // we default to using the latest offset for the partition ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET // since the specified offsets represent the next record to read, we subtract // it by one so that the initial state of the consumer will be correct : partitionToOffset.getValue() - 1); }
通过函数fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp),将 discoverd partitions 转换成应用指定的 timestamp对应offset开始消费。 结果有两种,一是partition中所有的records timestamp 都在app 配置的timestamp之前, offet = null, subTask 从 lastest 开始消费,否则从对应offset -1 位置开始消费 。
fetchOffsetsWithTimestamp函数是抽象的,在子类FlinkKafkaConsumer中 实现。如下: 实现思路是这个函数里边创建一个短暂的 KafkaConsumer 对象,调用
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,从kafka broker 获取 partition -> offset 的信息// use a short-lived consumer to fetch the offsets; // this is ok because this is a one-time operation that happens only on startupprotected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp( Collection<KafkaTopicPartition> partitions, long timestamp) {
第三种情况,startupMode 不是前两种的默认情况,就是将 allPartitions 全copy到 subscribedPartitionsToStartOffsets
default: for (KafkaTopicPartition seedPartition : allPartitions) { subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel()); }
经过上述三种情况,对于正常启动Flink Kafka Consumer open 阶段,完成了subTask 需要消费的partitions->offset 的列表,在open 方法的最后
this.deserializer.open( RuntimeContextInitializationContextAdapters.deserializationAdapter( getRuntimeContext(), metricGroup -> metricGroup.addGroup("user") ) );
是将反序列化的类进行open操作,即如果 deserializer 有一些init 操作需要在SubTask中进行一次处理,可以在open 里实现,Flink Kafka 的Consumer 框架会调用这个方法 。实际上 Flink Kafka 对于 反序列化的使用和 Kafka Client 不太一样, 我们一般使用 Kafka Clients的原生 API 是通过 properties 配置进一个字符串是 对应类的 全限定类名字, Kafka 内部逻辑会加载这个字符串对应的 class 并实例化来进行 反序列化操作。 这里呢, 这个deserializer需要在Flink Client 端创建,经过序列化传递到Flink framework ,然后到 TaskManager 的Slot 里在open 一下完成最后的初始化 。
至此 整个的 open 方法完成, FlinkConsumer 可用了。接下来就是run方法,在一个SubTask中真正读取Kafka topic 的逻辑
public void run(SourceContext<T> sourceContext) throws Exceptionrun里边的代码并不多,逻辑很简单,主要就是初始化一些metrics counter,kafka commit 的回调对象offsetCommitCallback, 以及 KafkaFetcher的创建和执行,当然也有Fetcher 的 runWithPartitionDiscovery 逻辑,这片文章不进一步介绍 PartitionDiscover 。 这里边着重看一下 KafkaFetcher , FlinkConsumerBase run方法将重要的和 Kafa Client 交互的东西都委托给 KafkaFetcher ,成员变量名字 kafkaFetcher
/** * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the * data, and emits it into the data streams. * * @param sourceContext The source context to emit data to. * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should handle, with their start offsets. * @param watermarkStrategy Optional, a serialized WatermarkStrategy. * @param runtimeContext The task's runtime context. * * @return The instantiated fetcher * * @throws Exception The method should forward exceptions */
// from this point forward: // - 'snapshotState' will draw offsets from the fetcher, // instead of being built from `subscribedPartitionsToStartOffsets` // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to // Kafka through the fetcher, if configured to do so) this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics); if (!running) { return; } // depending on whether we were restored with the current state version (1.3), // remaining logic branches off into 2 paths: // 1) New state - partition discovery loop executed as separate thread, with this // thread running the main fetcher loop // 2) Old state - partition discovery is disabled and only the main fetcher loop is executed if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { kafkaFetcher.runFetchLoop(); } else {
这个方法在 FlinkConsumer 这个子类中实现是这个样子的
protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; // this overwrites whatever setting the user configured in the properties adjustAutoCommitConfig(properties, offsetCommitMode); return new KafkaFetcher<>( sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(), deserializer, properties, pollTimeout, runtimeContext.getMetricGroup(), consumerMetricGroup, useMetrics); }
很多参数,用用到 sourceContext 来emit 从kafka 中读到的record, assignedPartitionsWithInitialOffsets 是具体Kafka的Consumer 要消费 topicPartitionList ,等等。
KafkaFetcher 的类结构有两层, 基类 org.apache.flink.streaming.connectors.kafka.internals 和实现类 KafkaFetcher
/** * Base class for all fetchers, which implement the connections to Kafka brokers and * pull records from Kafka partitions. * * <p>This fetcher base class implements the logic around emitting records and tracking offsets, * as well as around the optional timestamp assignment and watermark generation. * * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into * the Flink data streams. * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version. */ @Internal public abstract class AbstractFetcher<T, KPH> {
从AbstractFetcher 的注释中看到, 基类中完成到 brokers 的连接,和pull records, 发射records 到 flink runtime ,跟踪offsets等等
KafkaFetcher 也留到下一片文章介绍,这里留白 。
至此, FlinkConsumer 的创建,初始化,运行流程结束
其他
关于CheckPoint
FlinkConsuer 整个类继承结构继承了 CheckpointListener, CheckpointedFunction。
通过CheckpointListener的 notifyCheckpointComplete(long checkpointId) 方法,在Flink checkpoint 完成时候,能够通知作为external 的kafka broker 做一些事物操作,这里就是
offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS 的时候, Flink 的checkpoint 触发手动commit 本任务的 partition offset 到 zookeeper或则会broker 。
CheckpointListener的notifyCheckpointAborted 方法则没有作用
CheckpointedFunction的 snapshotState 方法则是在checkpoint 时候触发 FlinkConsumer在 statebackend 保存好 topic partition 的 offset
而CheckpointedFunction#initializeState方法 则是在FlinkConsumer 初始化的时候,对于从 checkpoint恢复的情况时候用于读取 算子状态,也就是 partition 的offset 的一个 map 。
这里都没有详细说明,如果开发过自己的flink source function 应该对这里的checkpoint 相关接口和类的使用都比较熟悉。
关于Metrics
Flink 对于Metrics 有很好的支持,也支持用户自定义metrics, 在 Flink consumer 有若干的metrics 来记录 kafka source 的运行情况,这些metrics 的名称记录在类 KafkaConsumerMetricConstants 中
public static final String KAFKA_CONSUMER_METRICS_GROUP = "KafkaConsumer"; // ------------------------------------------------------------------------ // Per-subtask metrics // ------------------------------------------------------------------------ public static final String COMMITS_SUCCEEDED_METRICS_COUNTER = "commitsSucceeded"; public static final String COMMITS_FAILED_METRICS_COUNTER = "commitsFailed"; // ------------------------------------------------------------------------ // Per-partition metrics // ------------------------------------------------------------------------ public static final String OFFSETS_BY_TOPIC_METRICS_GROUP = "topic"; public static final String OFFSETS_BY_PARTITION_METRICS_GROUP = "partition"; public static final String CURRENT_OFFSETS_METRICS_GAUGE = "currentOffsets"; public static final String COMMITTED_OFFSETS_METRICS_GAUGE = "committedOffsets"; // ------------------------------------------------------------------------ // Legacy metrics // ------------------------------------------------------------------------ public static final String LEGACY_CURRENT_OFFSETS_METRICS_GROUP = "current-offsets"; public static final String LEGACY_COMMITTED_OFFSETS_METRICS_GROUP = "committed-offsets";
可以通过KEY_DISABLE_METRICS = "flink.disable-metrics" 配置 properties 来配置,默认是 disable 是false ,也就是开启metrics 。
总结
Flink Kafka connector 的 consumer 部分包含很多,基本思想是在每一个Subtask里基于/复用 kafka clients 的consumer实例,是对于 KafkaConsumer的包装,但是又要符合Flink Source Function的需要,涉及到分布式任务中每一个SubTask 中topic partition的分配, 有partition discover , KafkaFetcher 等部分。 对于broker端topic partitons 的变化 Flink source 要能够适应性调整。
大家有没有想过为什么FlinkConuser 里边需要 PartitionDiscover ,原生的Kafka Client 中 KafkaConsumer 不是能够在user group里 rebalance ?
大家有没有想过如果利用Flink Consumer 只消费一个topic 的部分partition 能办到么 ? 如果不能,为什么这么设计
这篇文章遗留了一些技术点没有完全解释清楚,作者经历有限,接下来会在后续文章中继续来学习。
自己挖的坑是一定要填的~
坑列表~
[1] flink kafka 的反序列化序列化说明
[2]partitionDiscoverer 机制
[3] Flink Kafka 的 initializeState
就是从 Flink 的上下文中获取 state
public final void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME, createStateSerializer(getRuntimeContext().getExecutionConfig()))); if (context.isRestored()) { restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); // populate actual holder for restored state for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) { restoredState.put(kafkaOffset.f0, kafkaOffset.f1); } LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else { LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } }
/** State name of the consumer's partition offset states. */
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";/** * The offsets to restore to, if the consumer restores state from a checkpoint. * * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method. * * <p>Using a sorted map as the ordering is important when using restored state * to seed the partition discoverer. */ private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;