KAFKA消费者源码解析

KafkaConsumer

这个类有多种构造函数,分别传入参数Properties、ConsumerConfig等。

订阅函数

subscribe(Collection topics, ConsumerRebalanceListener listener)

acquireAndEnsureOpen();
        try {
            if (topics == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            } else if (topics.isEmpty()) {
                // treat subscribing to empty topic list as the same as unsubscribing
                this.unsubscribe();
            } else {
                for (String topic : topics) {
                    if (topic == null || topic.trim().isEmpty())
                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                }

                throwIfNoAssignorsConfigured();
                fetcher.clearBufferedDataForUnassignedTopics(topics);
                log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
                this.subscriptions.subscribe(new HashSet<>(topics), listener);
                metadata.setTopics(subscriptions.groupSubscription());
            }
        } finally {
            release();
        }

在最开始进行acquireAndEnsureOpen,确保消费者拿到锁,并且不被close。
在这里插入图片描述

其次为空判断,这里需要注意list不为空,但内容为空时,执行了取消订阅,因为订阅空主题就等于取消订阅。

接下来清理未分配主题UnassignedPartitions,通过遍历assignedPartitions,找出当前topic参数中属于它的currentTopicPartitions,然后循环TopicPartition,分区里不属于currentTopicPartitions的直接remove,留下的就是传入topics中可以订阅的topic。
调用subscriptions.subscribe实现真正的订阅操作:设置类型,rebalanceListener,订阅和订阅分组。

metadata.setTopics 对于元数据中的topic先清空,再把可以订阅的topics put进去。

最后进行release操作。

release

在这里插入图片描述

锁保护消费者免受多线程访问

ref

refcount用于允许获取currentThread的线程进行可重入访问

unsubscribe

刚才说了,取消订阅就等于订阅空的topics,所以设计中也是这样,直接传入参数Collections.EMPTY_SET,其他操作如上。

public void unsubscribe() {
        acquireAndEnsureOpen();
        try {
            fetcher.clearBufferedDataForUnassignedPartitions(Collections.EMPTY_SET);
            this.subscriptions.unsubscribe();
            this.coordinator.maybeLeaveGroup();
            this.metadata.needMetadataForAllTopics(false);
            log.info("Unsubscribed all topics or patterns and assigned partitions");
        } finally {
            release();
        }
    }

assign

fetcher.clearBufferedDataForUnassignedPartitions(partitions);

                // make sure the offsets of topic partitions the consumer is unsubscribing from
                // are committed since there will be no following rebalance
                this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());

                log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
                this.subscriptions.assignFromUser(new HashSet<>(partitions));
                metadata.setTopics(topics);

1 设置当前自动提交点位
2 在订阅状态中assignFromUser

设置了partitionToState.put(partition, state);


版权声明:本文为qq_35885429原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。