Kafka的架构之道-刘宇
作者:刘宇
CSDN博客地址:https://blog.csdn.net/liuyu973971883
有部分资料参考,如有侵权,请联系删除。如有不正确的地方,烦请指正,谢谢。
一、Kafka的相关术语

- replica:
每一个分区,根据副本因子N,会有N个副本。比如在broker1上有一个topic,分区为topic-1,副本因子为2,那么 在两个broker的数据目录里,就都有一个topic-1,其中一个是leader,一个follower。 - Segment:
partition物理上由多个segment组成,每个Segment存着message信息。 - Leader:
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 - Follower:
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持 数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。 - Offset:
kafka的存储文件都是按照offset.log来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只
要找到2048.log的文件即可。当然the first offset就是00000000000.log
二、Kafka的架构讲解

通常,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统 CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干 Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在 Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
三、Kafka的分布式模型
kafka分布式主要是指分区被分布在多台server(broker)上,同时每个分区都有leader和follower(不是必须),即老大和小弟的角色,这儿是老大负责处理,小弟负责同步,小弟可以变成老大,形成分布式模型。
kafka的分区日志(message)被分布在kafka集群的服务器上,每一个服务器处理数据和共享分区请求。每一个分区是被复制到一系列配置好的服务器上来进行容错。
每个分区有一个server节点来作为leader和零个或者多个server节点来作为followers。leader处理指定分区的所有 读写请求,同时follower被动复制leader。如果leader失败,follwers中的一个将会自动地变成一个新的leader。 每一个服务器都能作为分区的一个leader和作为其它分区的follower,因此kafka集群能被很好地平衡。kafka集群 是一个去中心化的集群。
kafka消费的并行度就是kaka topic分区的个数,或者分区的个数决定了同一时间同一消费者组内最多可以有多少个消费者消费数据。
四、Kafka的文件存储

- 在kafka集群中,分单个broker和多个broker。每个broker中有多个topic,topic数量可以自己设定。在每个topic中又有0到多个partition,每个partition为一个分区。kafka分区命名规则为topic的名称+有序序号,这个序号从0开始依次增加。
- 每个partition中有多个segment file。创建分区时,默认会生成一个segment file,kafka默认每个segment file的大小是1G。当生产者往partition中存储数据时,内存中存不下了,就会往segment file里面刷新。在存储数据时,会先生成一个segment file,当这个segment file到1G之后,再生成第二个segment file,以此类推。每个segment file对应两个文件,分别是以.log结尾的数据文件和以.index结尾的索引文件。在服务器 上,每个partition是一个目录,每个segment是分区目录下的一个文件。
- 每个segment file也有自己的命名规则,每个名字有20个字符,不够用0填充。每个名字从0开始命名,下一个segment file文件的名字就是,上一个segment file中最后一条消息的索引值。在.index文件中,存储的是 key-value格式的,key代表在.log中按顺序开始第n条消息,value代表该消息的位置偏移。但是在.index中不是对每条消息都做记录,它是每隔一些消息记录一次,避免占用太多内存。即使消息不在index记录中,在已有的记录中查找,范围也大大缩小了。.index中存放的消息索引是一个稀疏索引列表。
五、topic中的partition
5.1、为什么要分区
可以想象,如果一个topic就一个分区,要是这个分区有1T数据,那么kafka就想把大文件划分到更多的目录来管理,这就是kafka所谓的分区。
5.2、分区的好处
- 方便在集群中扩展。因为一个topic由一个或者多个partition构成,而每个节点中通常可以存储多个partition,这样就方便分区存储与移动,也就增加其扩展性。同时也可以增加其topic的数据量。
- 可以提高并发。因为一个主题多个partition,而每个主题读写数据时,其实就是读写不同的partition,所以增加其并发。
5.3、单节点partition的存储分布
Kafka集群只有一个broker,默认/var/log/kafka-log为数据文件存储根目录,在Kafka broker中server.properties 文件配置(参数log.dirs=/opt/data/kafka),例如创建2个topic名称分别为test-1、test-2, partitions数量都为 partitions=4 存储路径和目录规则为:
|--test-1-0
|--test-1-1
|--test-1-2
|--test-1-3
|--test-2-0
|--test-2-1
|--test-2-2
|--test-2-3
在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为:topic名称+分区编号(有序),第一个partiton序号从0开始,序号最大值为partitions数量减1。
5.4、多节点partition存储分布

5.5、分区分配策略
- 将所有broker(n个)和partition排序
- 将第i个Partition分配到第(i mode n)个broker上
举例说明:
test3的topic,4个分区,2个副本。
[root@bigdata01 kafka]# ./bin/kafka-topics.sh --describe --zookeeper bigdata01:2181/kafka -- topic test3
Topic:test3 PartitionCount:4 ReplicationFactor:2 Configs:
Topic: test3 Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: test3 Partition: 1 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: test3 Partition: 2 Leader: 3 Replicas: 3,2 Isr: 2,3
Topic: test3 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
第1个Partition分配到第(1 mode 3)= 1个broker上
第2个Partition分配到第(2 mode 3)= 2个broker上
第3个Partition分配到第(3 mode 3)= 3个broker上。
第4个Partition分配到第(4 mode 3)= 1个broker上
5.6、副本分配策略
- 在Kafka集群中,每个Broker都有均等分配Partition的Leader机会。
- 上述图Broker Partition中,箭头指向为副本,以Partition-0为例:broker3中parition-0为Leader,Broker1中Partition-0为副本。
- 上述图种每个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配,多副本都遵循此规则。
副本分配算法:
- 将所有N Broker和待分配的i个Partition排序。
- 将第i个Partition分配到第(i mod n)个Broker上。
- 将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上。
分区及副本分配举例:
[root@bigdata01 kafka]# ./bin/kafka-topics.sh --describe --zookeeper bigdata01:2181/kafka -- topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test1 Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
第0个paritition分配到第(0%3)个broker上,即分配到第1个broker上。0分区的第1个副本在((0+1)%3)=1个 broker
第1个paritition分配到第(1%3)个broker上,即分配到第2个broker上。
第2个paritition分配到第(2%3)个broker上,即分配到第3个broker上。
5.7、数据分配策略
- 如果指定了partition,进入该partition。
- 如果没有指定该partition,但是指定key,通过key的字节数组信息的hashcode值和partition数求模确定partition。
- 如果都没有指定,通过轮询方式进入对应的partition。
5.8、partition中文件存储
如下是一个partition-0的一个存储示意图。
- 每个分区一个目录,该目录中是一堆segment file(默认一个segment是1G),该目录和file都是物理存储于磁盘。
- 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
- 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
- 这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
六、kafka分区中的segemnt 重点


通过上面两张图,我们已经知道topic、partition、segment、.log、.index等文件的关系,下面深入看看segment相关组成原理。
6.1、segment file组成
由2大部分组成,分别为index file和log file(即数据文件),这2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件。
6.2、segment文件命名规则
partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,20位数字字符长度,不够的左边用0填充。
6.3、小实验
创建一个topic为test5包含1 partition,设置每个segment大小为1G,并启动producer向Kafka broker写入大量数据。
[root@bigdata01 kafka]# bin/kafka-topics.sh --create --zookeeper bigdata01:2181/kafka --replication-factor 1 --partitions 1 --topic test5
Created topic "test5".
[root@bigdata01 kafka]# ./bin/kafka-console-producer.sh --broker-list bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic test5
>nihao beijing
>nihao qianfeng
>124
>123456789
>098765
>111
>222
>666
>999
>laowang
>goudan
不断写很多......
6.3.1、查看segment文件列表
[root@bigdata03 kafka]# ll /opt/data/kafka/test5-0/ #查看分区目录
total 8
-rw-r--r--. 1 root root 10485760 Nov 21 10:50 00000000000000000000.index #segment文件索引文件
-rw-r--r--. 1 root root 1073761826 Nov 21 10:53 00000000000000000000.log #segemnt的log文件
-rw-r--r--. 1 root root 10485760 Nov 21 10:50 00000000000000023060.index
-rw-r--r--. 1 root root 892 Nov 21 10:53 00000000000000023060.log
-rw-r--r--. 1 root root 10485756 Nov 21 10:50 00000000000000003268.timeindex
-rw-r--r--. 1 root root 8 Nov 21 10:52 leader-epoch-checkpoint
6.3.2、查看segment文件中的log文件
- 查看.log
./kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
- 输出内容
Starting offset: 0 offset: 0 position: 0 CreateTime: 1577994283622 isvalid: true keysize: -1
valuesize: 1 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1
isTransactional: false headerKeys: [] payload: a
offset: 1 position: 69 CreateTime: 1577994466159 isvalid: true keysize: -1 valuesize: 1 magic: 2
compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false
headerKeys: [] payload: 1
offset: 2 position: 138 CreateTime: 1577994474463 isvalid: true keysize: -1 valuesize: 1 magic: 2
compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false
headerKeys: [] payload: 4
6.3.3、查看segment文件中的index文件
./kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data-log
- 输出内容
Dumping 00000000000000000000.index offset: 0 position: 0
6.4、segment文件的物理结构
6.4.1、index和log文件之间的对应关系
如图,索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。

举例说明:
上述索引文件中元数据6–>266为例,依次在数据文件中表示第6个message(在全局partiton表示第20366个message)、以及该消息的物理偏移地址为266。
6.4.2、message物理结构
一个segment data file由许多message组成,一个message物理结构具体如下:

具体参数详解:
| 关键字 | 解释说明 |
|---|---|
| 8 byte offset | 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message。 |
| 4 byte message size | message大小 |
| 4 byte CRC32 | 用crc32校验message |
| 1 byte “magic" | 表示本次发布Kafka服务程序协议版本号 |
| 1 byte “attributes" | 表示为独立版本、或标识压缩类型、或编码类型。 |
| 4 byte key length | 表示key的长度,当key为-1时,K byte key字段不填 |
| K byte key | 可选 |
| value bytes payload | 表示实际消息数据。 |
七、kafka中消息查找流程
后面内容敬请期待