KafkaConsumer
这个类有多种构造函数,分别传入参数Properties、ConsumerConfig等。
订阅函数
subscribe(Collection topics, ConsumerRebalanceListener listener)
acquireAndEnsureOpen();
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
throwIfNoAssignorsConfigured();
fetcher.clearBufferedDataForUnassignedTopics(topics);
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
}
} finally {
release();
}
在最开始进行acquireAndEnsureOpen,确保消费者拿到锁,并且不被close。
其次为空判断,这里需要注意list不为空,但内容为空时,执行了取消订阅,因为订阅空主题就等于取消订阅。
接下来清理未分配主题UnassignedPartitions,通过遍历assignedPartitions,找出当前topic参数中属于它的currentTopicPartitions,然后循环TopicPartition,分区里不属于currentTopicPartitions的直接remove,留下的就是传入topics中可以订阅的topic。
调用subscriptions.subscribe实现真正的订阅操作:设置类型,rebalanceListener,订阅和订阅分组。
metadata.setTopics 对于元数据中的topic先清空,再把可以订阅的topics put进去。
最后进行release操作。
release

锁保护消费者免受多线程访问
ref
refcount用于允许获取currentThread的线程进行可重入访问
unsubscribe
刚才说了,取消订阅就等于订阅空的topics,所以设计中也是这样,直接传入参数Collections.EMPTY_SET,其他操作如上。
public void unsubscribe() {
acquireAndEnsureOpen();
try {
fetcher.clearBufferedDataForUnassignedPartitions(Collections.EMPTY_SET);
this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);
log.info("Unsubscribed all topics or patterns and assigned partitions");
} finally {
release();
}
}
assign
fetcher.clearBufferedDataForUnassignedPartitions(partitions);
// make sure the offsets of topic partitions the consumer is unsubscribing from
// are committed since there will be no following rebalance
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(new HashSet<>(partitions));
metadata.setTopics(topics);
1 设置当前自动提交点位
2 在订阅状态中assignFromUser
设置了partitionToState.put(partition, state);
版权声明:本文为qq_35885429原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。