kafka学习笔记
Kafka原理篇
Kafka基本概念
Topic
- 主题。在 kafka 中,使用一个类别属性来划分消息的所属类,划分消息的这个类称为 topic。topic 相当于消息的分类标签,是一个逻辑概念。
Partition
- 分区。topic 中的消息被分割为一个或多个 partition,其是一个物理概念,对应到系统上就是一个或若干个目录。一个 topic 的 partition 数量是 broker 的整数倍。
- Kafka将topic划分为多个partition进行存储拥有两个好处:
- 消息存储扩容:一个文件的存储大小是有限的,但在集群中的多个文件的存储就可以大大增加一个topic能够保存的消息数量。
- 并行读写:通过多个partition文件存储消息,意味着producer和consumer可以并行的读写一个topic。
- 注意:下图是 topic 的解析,对于生产者顺序产生的消息,当随机写到不同的 partition 分区后,消费者在进行消费的时候是不能保证原来消息的顺序的。即消费者消费消息,会丢失顺序。所以,如果保证严格的一致性,那么可以使用单个 partition。
- Kafka 的消息组织方式实际上是三级结构:主题 – 分区 – 消息。主题下的每条消息只会保存在某一个分区中。
- 对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
- 除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如实现业务级别的消息顺序的问题。
segment
- 段。将partition进一步细分为了若干的 segment,每个 segment 文件的大小相等。文件大小可以设置。
Broker
- kafka 集群包含一个或多个服务器,每个服务器节点称为一个 broker。
- 假设某 topic 中有 N个 partition,集群中有 M 个 broker,则 broker 与 partition 间的关系:
- 若 N>=M,且 N 是 M 的整数倍,则每个 broker 上会平均存储(N/M)个该 topic 的多个 partition。
- 若 N>M,但 N 不是 M 的整数倍,则会出现 broker 上的 partition 分布不平均,导致 kafka 集群消息不均衡,导致各个 broker 任务不均衡,应尽量避免这种情况。
- 若 N<M,则会有 N 个 broker 上存储该 topic 的一个 partition,剩下的(M-N)个 broker 中没有存储该 topic 的 partition。
Producer
- 生产者。即消息的发布者,其会将某 topic 的消息发布到相应的 partition 中。生产者发送的消息默认会负载均衡地写入到该 topic 的各个 partition中。但是,也可以指定该生产者生产的消息存放的partition。
- 如果没有指定具体的partition号,那么Kafka Producer可以通过一定的算法计算出对应的partition号。
- 如果消息指定了key,则对key进行hash,然后映射到对应的partition号。
- 如果消息没有指定key,则使用Round Robin轮询算法来确定partition号,这样可以保证数据在所有的partition上平均分配。
- 另外,Kafka Producer也支持自定义的partition分配方式。客户端提供一个实现了 org.apache.kafka.clients.producer.Partitioner 的类,然后将此实现类配置到Producer中即可。
Consumer
- 消费者。可以从 broker 中读取消息。
- 一个消费者可以消费多个 topic 的消息。
- 一个消费者也可以消费同一个 topic 中的多个 partition 消息。
- 一个 partition 允许多个消费者同时消费。
Consumer Group
- Consumer Group 是 kafka 提供的可扩展且具有容错性的消费者机制。组内可以有多个消费者,它们共享一个公共的ID,即 group ID。组内的所有消费者协调在一起来消费订阅主题的所有分区。
- kafka 保证同一个 Consumer Group 中只有一个 Consumer 会消费某条消息,实际上,kafka 保证的是稳定状态下每一个 Consumer 实例只会消费某一个或多个特定的 partition,而某个 partition 的数据只会被某一个特定的 Consumer 实例所消费。
- N 个 partition可以被 N 个组消费 N 次,不能被 N 个组消费 M 次,其中 M 比 N 大。
- kafka保证了同一个消费组中只有一个消费者实例会消费某条消息,实际上,kafka保证的是稳定状态下每一个消费者实例只会消费一个或多个特定partition数据,而某个partition的数据只 会被某一特定的consumer实例消费,这样设计的劣势是无法让同一个消费组里的consumer均匀 消费,优势是每个consumer不用跟大量的broker通信,减少通信开销,也降低了分配难度。而 且,同一个partition数据是有序的,保证了有序被消费。
- consumer rebalance的控制策略是由每个consumer通过Zookeeper完成的。
Replicas of partition
- 分区副本。副本是一个分区的备份,是为了防止消息丢失而创建的分区的备份。
kafka副本定义
- 同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。
副本⻆色
- 采用基于领导者(Leader-based)的副本机制,确保副本中所有的数据都是一致,基于领导者的副本机制的工作原理如下图所示:
- 副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
- 追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
- 当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
为什么追随者副本是不对外提供服务?
- 方便实现“Read-your-writes”。所谓 Read-your-writes,顾名思义就是,当你使用生产者 API 向 Kafka 成功写入消息后,⻢上使用消费者 API 去读取刚才生产的消息。
- 方便实现单调读(Monotonic Reads)。什么是单调读呢?就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。
如何创建
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 复制因子不能超过broker 数量
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 5 --partitions 1 --topic test2
Partition Leader
- 每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负载消息读写的 partition。即所有读写操作只能发生于 Leader 分区上。
- partition leader就是broker Controller 选举出来的,ISR中的follower采用的是轮流坐庄的方式。
- 当 partition leader 宕机 ,则立⻢由 broker controller 从 follower 中选举出来一个新的 leader。其实所谓选举, 其实就是“按资排辈”。从 ISR 列表中找到第一个 follower 作为新的 leader。
Partition Follower
- 所有 Follower 都需要从 Leader 同步消息,Follower 与 Leader 始终保持消息同步。Leader 与 Follower 的关系是主备关系,而非主从关系。
- Partition Follower是通过Borker Controller 选举产生的。
ISR
- ISR,In-Sync Replicas,是指副本同步列表,Leader将从ISR这里面选,具有被选举权,没有选举权,Leader是由Broker Controller决定。是存放在 Zookeeper 中的。
- AR,Assigned Replicas,即所有的副本。在最初还没有 Leader 时 ISR = AR。
- OSR,Outof-Sync Replicas。同步过程中,发生延迟并且超过阈值(延迟时间可以设置)的副本将被踢出去,放到OSR。同时Broker Controller会定时发心跳检测所有的OSR,一旦符合条件了就可以再次进到ISR中。
- 数量关系:AR = ISR + OSR + Leader。
同步机制
- ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。
- Kafka 判断 Follower 是否与 Leader 同步的标准,不是看相差的消息数,而是另有“玄机”。
- 这个标准就是 Broker 端参数 replica.lag.time.max.ms 参数值。
- 这个参数的含义是 Follower 副本能够落后 Leader 副本的最⻓时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
- ISR 是一个动态调整的集合,而非静态不变的。
思考:光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?
- 不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。
- ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。
- 如果要保证写入kafka的数据不丢失,首先需要保证ISR中至少有一个follower,其次就是在一条数据写入了leader partition之后,要求必须复制给ISR中所有的follower partition,才能说代表这条数据已提交,绝对不会丢失,这是Kafka给出的承诺。
Unclean 领导者选举(Unclean Leader Election)
- Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
- false:必须等待ISR列表中有副本活过来才进行新的选举。该策略可靠性有保证,但可用性低。
- true:在ISR中没有副本的情况下可以选择任何一个该Topic的partition作为新的leader,该策略可用性高,但可靠性没有保证。
如果你听说过 CAP 理论的话,你一定知道,一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。
- 强烈建议你不要开启它,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。
offset
概述
- 偏移量。每条消息都有一个当前 Partition 下唯一的 64 字节的 offset,它是相对于当前分区第一条消息的偏移量。
- Consumer消费消息时,通过指定的offset来定位下一条要读取的消息。值得注意的是,offset的维护是由Consumer全权控制的。
- 当broker中配置的时间到达时,不论消息是否被消费,Kafka都会清理磁盘空间。
消费的消息数位置
- kafka0.8 版本之后offset保存在kafka集群上。kafka0.8 版本之前offset保存在zookeeper上。
- 新版本 Consumer 的位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。
- 它是把消费者消费topic的位置通过kafka集群内部有一个默认的topic,名称叫 __consumer_offsets,它默认有50个分区。
- 老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易⻅的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。
- 不过,慢慢地人们发现了一个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在ZooKeeper 中是不合适的做法。
- 在新版本的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。即:__consumer_offsets。
offset commit
- 当consumer从partition中消费了消息后,consumer会将其消费的消息的offset提交给broker,表示当前partition已经消费到了该offset所标识的消息。
- Consumer从partition中取出一批消息写入到buffer对其进行消费,在规定时间内消费完消息后,会自动将其消费消息的offset提交给broker,以让broker记录下哪些消息是消费过的。当然,若在时限内没有消费完毕,是不会提交offset的。
Consumer 提交位移的方式
- Kafka Consumer 提交位移时会写入该主题,那 Consumer 是怎么提交位移的呢?目前 KafkaConsumer 提交位移的方式有两种:自动提交位移和手动提交位移。
自动提交 vs 手动提交
- enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。
- 在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。
- commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:
- 利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
- 不希望程序总处于阻塞状态,影响 TPS。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAysnc(); // 使用异步提交规避阻塞
}
} catch (Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次同步阻塞式提交,位移的正确性
} finally {
consumer.colse();
}
}
更精细的提交offset
commitSync(Map<TopicPartition, OffsetAndMetadata>)
commitAsync(Map<TopicPartition, OffsetAndMetadata>)
- 它们的参数是一个 Map 对象,键就是 TopicPartition,即消费的分区,而值是一个OffsetAndMetadata 对象,保存的主要是位移数据。
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
// ...
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是 null
count++;
}
}
HW与LEO
- HW,HighWatermark,高水位,表示 Consumer 可以消费的最高 partition 偏移量。HW 保证了 kafka 集群中消息的一致性。
- LEO,Log End Offset,日志最后消息的偏移量。消息在 kafka 中是被写入到日志文件中的,这是当前最后一个消息在 partition 中的偏移量。
- 对于 Leader 新写入的消息,Consumer是不能立刻消费的。Leader会等待该消息被所有 ISR 中的 partition follower 同步后才会更新 HW,将 HW 写入到 ISR 中,此时消息才能被 Consumer 消费。
- HW 的更新速度取决于 kafka 集群中性能最差那个 broker,这是典型的“木桶效应”。
__consumer_offsets
- 该主题的partition默认有50个,计算公式:
Math.abs(groupID.hashCode()) % 50
- __consumer_offsets中数据的有效期为1天。key:group.id+topic+分区号,value就是当前offset的值。
- 每隔一段时间 kafka 内部会对这个topic进行compact。也就是每个group.id+topic+分区号就保留最新的那条数据即可。
- 写入到__consumer_offsets主题的partition中的offset消息格式为:
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
# vim server.properties
# Offsets topic的复制因子
offsets.topic.replication.factor=1
# 事务主题的复制因子(设置更高以确保可用性),内部主题创建将失败,直到群集大小满足此复制因素要求。
transaction.state.log.replication.factor=1
# 覆盖事务主题的min.insync.replicas配置
transaction.state.log.min.isr=1
- 当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。
__consumer_offsets的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。事实上,Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。
Rebalance
- Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
触发条件
- 组成员数发生变更。
- 订阅主题数发生变更。
consumer.subscribe(Pattern.compile(“t.*c”)) 就表明订阅所有以字母 t 开头、字母 c 结尾的主题。
- 订阅主题的分区数发生变更。
问题
- 消费暂停
- Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。
其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。
- Rebalance 速度慢。
曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至少现在还没有特别好的解决方案。所谓“本事大不如不摊上”,也许最好的解决方案就是避免 Rebalance 的发生吧。
Broker Controller
如何选举
- broker controller负责管理分区与副本,broker controller 由 zk 负责选举。
- Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
查看controller节点
- 查看controller发生了多少次变化:
controller的作用
- 总的来说是起协调作用的组件。
选举Leader和ISR
- 从分区副本列表中选出一个作为该分区的leader,并将该分区对应所有副本置于ISR列表。
同步元数据信息包括broker和分区的元数据信息
- 控制ZK的/brokers/ids,以及上一个步骤得到的topic下各分区leader和ISR,将这些元数据信息同步到集群每个broker。
- 当有broker或者分区发生变更时及时,更新到集群保证集群每一台broker缓存的是最新元数据。
broker增删监听与处理
- 控制器启动时就起一个监视器监视ZK/brokers/ids/子节点。当存在broker启动加入集群后,都会在ZK/brokers/ids/增加一个子节点brokerId,控制器的监视器发现这种变化后,控制器开始执行broker加入的相关流程并更新元数据信息到集群。
- 控制器启动时就起一个监视器监视ZK/brokers/ids/子节点。当一个broker崩溃时,该broker与ZK的会话失效导致ZK会删除该子节点,控制器的监视器发现这种变化后,控制器开始执行broker删除的相关流程并更新元数据信息到集群。
topic变化监听与处理
- 控制器启动时就起一个监视器监视ZK/brokers/topics/子节点。当通过脚本或者请求创建一个topic后,该topic对应的所有分区及其副本都会写入该目录下的一个子节点。控制器的监视器发现这种变化后,控制器开始执行topic创建的相关流程包括leader选举和ISR并同步元数据信息到集群;且新增一个监视器监视ZK/brokers/topics/<新增topic子节点内容>防止该topic内容变化。
- 控制器启动时就起一个监视器监视ZK/admin/delete_topics/子节点。当通过脚本或者请求删除一个topic后,该topic会写入该目录下的一个子节点。控制器的监视器发现这种变化后,控制器开始执行topic删除的相关流程包括通知该topic所有分区的所有副本停止运行;通知所有分区所有副本删除数据;删除ZK/admin/delete_topics/<待删除topic子节点>。
分区变化监听与变化处理
- 分区重分配监听与处理:分区重分配通过KAFKA管理员脚本执行完成一个topic下分区的副本重新分配broker。
- 分区扩展监听与处理:当创建一个topic后,控制器会增加一个监视器监视ZK/brokers/topics/<新增topic子节点内容>防止该topic内容变化。当通过脚本执行分扩展后会在该目录增加新的分区目录。控制器的监视器发现这种变化后,控制器开始执行分区扩展相应流程如选举leader和ISR并同步。
broker优雅退出
- 相比较broker机器直接宕机或强制kill,通过脚本关闭一个broker我们称为broker优雅退出。即将关闭的broker向控制器发送退出请求后一直阻塞。
- 控制器接收到请求后,执行leader重选举和ISR后响应broker。broker接收后退出。
- 这个比较特殊,不依赖ZK,直接通过broker和控制器RPC通信即可完成。
数据服务
- 控制器的最后一大类工作,就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
- 控制器保存了什么数据:
- 所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
- 所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等。
- 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。
控制器故障转移
- 故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。
- 这个过程就被称为 Failover,该过程是自动完成的,无需你手动干预。
Zookeeper
- Kafka 通过 zookeeper 来存储集群的meta元数据信息,Zookeeper负责维护和协调broker,负责Broker Controller的选举。
- ZooInspector 可视化工具查看:
Group Coordinator
- 主要用于Consumer Group中的各个成员的offset位移管理和Rebalance。Group Coordinator 同时管理着当前broker的所有消费者组。
Coordinator的作用
- 每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance。
- 根据内部的一个选择机制,会挑选一个对应的Broker,Kafka总会把各个消费组均匀分配给各个Broker作为coordinator来进行管理的。
- consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给你的这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。
如何选择哪台是coordinator
- 首先对消费组的groupId进行hash,接着对consumer_offsets的分区数量取模,找到你的这个consumer group的offset要提交到consumer_offsets的哪个分区。
- 比如说:groupId,“membership-consumer-group” => hash值(数字)=> 对50取模 => 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,找到consumer_offsets的一个分区,consumer_offset的分区的副本数量默认来说1,只有一个leader,然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,consumer接着就会维护一个Socket连接跟这个Broker进行通信。
Kafka工作原理
Broker启动
- 当每个 broker 启动时,会在 ZooKeeper 中的 /brokers/ids 路径下创建一个节点来注册自己,节点 ID 为配置文件中的 broker.id 参数,后注册的 broker 会报 NodeExists 的错。
- 如果不指定 broker.id 或者指定成 -1,节点 ID 会从 reserved.broker.max.id 这个参数加 1 的值开始,这个参数默认值是 1000,所以经常可以看⻅ 1001、1002 的 broker ID。
- broker 的监听:每个 broker 除了注册自身之外,还会监听 /brokers/ids 这个节点,当这个节点下增加或删除子节点时,ZooKeeper 会通知监听了的 broker。每个 broker 创建的节点都是临时节点,如果 broker 下线,/brokers/ids 下对应的节点就会被删除。
注意:broker 下线,只会删除 /brokers/ids 下的节点,其它的节点中可能还包含这个 broker 的 ID,比如 /brokers/topics 下的节点会记录每个分区的副本存储在哪些 broker 上,这些节点不会删除这个 broker 的 ID,因为这个 broker 还有可能恢复,如果恢复不了,可以用相同的 ID 启动一个新的 broker,新的 broker 会代替原来 broker 的位置,开始同步数据。
Controller的作用
- Controller 除了是一个普通的 broker 之外,还是集群的总扛把子,它负责副本 leader 的选举、topic 的创建和删除、副本的迁移、副本数的增加、broker 上下线的管理等等。
- Controller的选举:在 ZooKeeper 中创建 /controller 临时节点节点并注册自己的信息,先注册上的 broker 就是controller。
- Controller的重新选举:选举出新 controller 的同时, /controller_epoch 中的值也会加 1,这个节点记录 controller 的迭代数。
- Controller管理broker的离开和加入。
消息路由策略
在通过 API 方式发布消息时,生产者是以 Record 为消息进行发布的。Record 中包含 key 与 value,value 才是我们真正的消费本身,而 key 用于路由消息所要存放的 partition。消息要写入到哪个 partition 并不是随机的,而是有路由策略的。
- 若指定了 partition,则直接写入到指定的 partition;
- 若未指定 partiton 但指定了key,则通过对 key 的 hash 值与 partition 数量取模,该取模结果就是要选出的 partition 索引;
- 若 partition 和 key 都未指定,则使用轮询算法选出一个 partition。
发送消息分区选择
- KafkaProducer#send() 源码跟踪:
- 默认使用的分区选择器实现类是 DefaultPartitioner,该类的分区选择策略如下:
用户自定义分区
public class MyPartitioner implements Partitioner {
/**
* 通过这个方法来实现消息要去哪一个分区中
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取分区数
int partitions = cluster.partitionsForTopic(topic).size();
// key.hashCode()可能会出现负数 -1 -2 0 1 2
// Math.abs 取绝对值
return Math.abs(key.hashCode() % partitions);
}
@Override
public void close() { }
@Override
public void configure(Map<String, ?> configs) { }
}
- 配置自定义分区类:
properties.put("partitioner.class","com.abc.producer.MyPartitioner");
示例
// 1、给定具体的分区号,数据就会写入到指定的分区中
ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 0, 1, "tianjin");
// 2、不给定具体的分区号,给定一个key值, 这里使用key的 hashcode%分区数=分区号
ProducerRecord<Integer, String> record2 = new ProducerRecord<>("cities", 0, "tianjin");
// 3、不给定具体的分区号,也不给定对应的key ,这个它会进行轮训的方式把数据写入到不同分区中
ProducerRecord<Integer, String> record3 = new ProducerRecord<>("cities", "tianjin");
消费分区分配算法
- Kafka中提供了多重分区分配算法(PartitionAssignor)的实现:RangeAssignor、RoundRobinAssignor、StickyAssignor。
- PartitionAssignor 接⼝⽤于⽤户定义实现分区分配算法,以实现Consumer之间的分区分配。
- Kafka默认采⽤RangeAssignor的分配算法。
RangeAssignor
- RangeAssignor策略的原理是按照消费者总数和分区总数进⾏整除运算来获得⼀个跨度,然后将分区按照跨度进⾏平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每⼀个Topic,RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配⼀个分区。
- 这种分配⽅式明显的⼀个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,⽐如上图中4个分区3个消费者的场景,C0会多分配⼀个分区。如果此时再订阅⼀个分区数为4的Topic,那么C0⼜会⽐C1、C2多分配⼀个分区,这样C0总共就⽐C1、C2多分配两个分区了,⽽且随着Topic的增加,这个情况会越来越严重。
- 算法实现:
RoundRobinAssignor
- RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进⾏排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进⾏排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。
StickyAssignor
- 尽管RoundRobinAssignor已经在RangeAssignor上做了⼀些优化来更均衡的分配分区,但是在⼀些情况下依旧会产⽣严重的分配偏差。
- 从字⾯意义上看,Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每⼀次分配变更相对上⼀次分配做最少的变动(上⼀次的结果是有粘性的)。
- 其⽬标有两点:分区的分配尽量的均衡,每⼀次重分配的结果尽量与上⼀次分配结果保持⼀致。
- StickyAssignor 这个类的源码是上⾯两个类的源码的10倍,有兴趣可以自己看看。
⾃定义分区策略
- 只需要继承AbstractPartitionAssignor并复写其中⽅法即可(当然也可以直接实现PartitionAssignor接⼝),其中有两个⽅法需要复写。
// 分区分配⽅案的实现
Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions);
// 表示了这个分配策略的唯⼀名称
String name();
- ⽽name()⽅法则表示了这个分配策略的唯⼀名称,⽐如之前提到的range,roundrobin和sticky,这个名字会在和GroupCoordinator的通信中返回,通过它consumer leader来确定整个group的分区⽅案(分区策略是由group中的consumer共同投票决定的,谁使⽤的多,就是⽤哪个策略)
// 指定分区分配cell
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);