文章目录
Kafka架构详解
Kafka架构图
![[外链图片转存失败(img-eFr4rAYy-1563638477133)(img/kafka02.png)]](https://img-blog.csdnimg.cn/20190721000413812.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNTAyODU4,size_16,color_FFFFFF,t_70)
Zookeeper在Kafka的作用
- 无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
- Kafka使用zookeeper作为其分布式协调框架,很好的将消息生产,消息存储,消息消费的过程结合在一起。
- 同时借助zookeeper,kafka能够将生产者,消费者和broker在内的所有组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者和消费者的负载均衡。
zookeeper在Kafka中保存的meta信息
![[外链图片转存失败(img-kWOjodbK-1563638477134)(img/zknode.png)]](https://img-blog.csdnimg.cn/20190721000354903.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNTAyODU4,size_16,color_FFFFFF,t_70)
- admin目录下保存着标记删除的topic信息;
Kafka文件存储机制
Kafka中消息是以topic进行分类的,生产者通过topic向Kafka broker发送消息,消费者通过topic读取数据,然而topic在物理层面又能以partiton为分组,一个topic可以分为若干个partition,partition还可以细分为segment,一个partition物理上由多个segment组成。
在Kafka文件存储中,同一个topic下有多个不同的partition,每个partiton为一个目录,partition的名称规则为:topic名称+有序序号,第一个序号从0开始计,最大的序号为partition数量减1,partition是实际物理上的概念,而topic是逻辑上的概念。
上面提到partition还可以细分为segment,这个segment又是什么?如果就以partition为最小存储单位,我们可以想象当Kafka producer不断发送消息,必然会引起partition文件的无限扩张,这样对于消息文件的维护以及已经被消费的消息的清理带来严重的影响,所以这里以segment为单位又将partition细分。每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(段)数据文件中(每个segment 文件中消息数量不一定相等)这种特性也方便old segment的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定。
segment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。这两个文件的命名规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:
指定log.segment.bytes=1048576,即每个segment数据文件大小最大为1mb,重启kafka服务,创建topic test04只有一个分区
kafka-topics.sh --create --zookeeper pseudo01:2181 --replication-factor 1 --partitions 1 --topic test04
kafka-console-producer.sh --broker-list pseudo01:9092 --topic test04 --producer.config $KAFKA_HOME/config/producer.properties
kafka-console-consumer.sh --bootstrap-server pseudo01:9092 --zookeeper pseudo01:2181 --topic test04 --from-beginning
向test03写入数据,查看分区目录test04-0
![[外链图片转存失败(img-54P2M8B6-1563638477134)(img/segment.png)]](https://img-blog.csdnimg.cn/20190721000233587.png)
segment的.index和.log文件解析
![[外链图片转存失败(img-7u87N5mQ-1563638477134)(img/segment01.png)]](https://img-blog.csdnimg.cn/2019072100022138.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNTAyODU4,size_16,color_FFFFFF,t_70)
ISR-实现partition在多个副本中Leader的选择
In-Sync Replicas
这个是指副本同步队列。副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性。默认情况下Kafka的replica数量为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比如3。 所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。那么在之后追上leader,并被重新加入了ISR。AR=ISR+OSR。
Leader的选择Kafka的核心是日志文件,日志文件在集群中的同步时分布式数据系统最基础的要素。
如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.
Kafka并不是使用这种方法。
Kafaka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被ISR中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader,ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。
LEO(LogEndOffset)
![[外链图片转存失败(img-VQKPpWWf-1563638477134)(img/part01.png)]](https://img-blog.csdnimg.cn/20190721000151667.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNTAyODU4,size_16,color_FFFFFF,t_70)
![[外链图片转存失败(img-YvxwvvET-1563638477135)(img/write02.png)]](https://img-blog.csdnimg.cn/20190721000202310.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNTAyODU4,size_16,color_FFFFFF,t_70)
Topic的创建
- controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
- controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
- (1) 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
- (2)将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
- controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
Topic的删除
- controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
- 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
总结
消息状态: 在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。
消息持久化: Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。
消息有效期: Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。
批量发送: Kafka支持以消息集合为单位进行批量发送,以提高push效率。
push-and-pull: Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
Kafka集群中broker之间的关系: 不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。
负载均衡方面:
同步异步: Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
分区机制partition: Kafka的broker端支持消息分区,Producer可以决定把消息发到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。离线数据装载: Kafka由于对可拓展的数据持久化的支持,它也非常适合向Hadoop或者数据仓库中进行数据装载。