Java KafkaConsumer消费者是如何管理TCP连接的? 何时建立?何时关闭?
何时创建 TCP 连接?
生产者在构建过程中会创建TCP连接,而消费者并不会。
在kafka生产者中的构造器有这样一段代码,可能会造成this指针逃逸,就是在构造器中启动线程,有可能会导致预期和真实结果产生差异。
kafka消费者 TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的
- 发起 FindCoordinator 请求时。
消费者向kafka集群中发送一个找到协调者的请求,希望Kafka告诉他那个Broker是管理他的协调者。 - 连接协调者时。
通过协调者和Broker发生Socket连接,做具体的一些操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。 - 消费数据时。
消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。举个例子,假设消费者要消费 5 个分区的数据,这 5 个分区各自的领导者副本分布在 4 台 Broker上,那么该消费者在消费时会创建与这 4 台 Broker 的 Socket 连接。
消费者程序会创建 3 类 TCP 连接
- 确定协调者和获取集群元数据。
- 连接协调者,令其执行组成员管理操作。
- 执行实际的消息获取。
何时关闭 TCP 连接?
手动关闭
比如kill掉
KafkaConsumer.close() 方法
自动关闭
消费者端参数 connection.max.idle.ms
这个参数控制连接时间,该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接。
第一类连接会在一定时间后关闭,一直运行的连接是第二类和第三类。
如有错误欢迎指正
版权声明:本文为jj89929665原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。