Kafka的消息日志存储

1. 文件目录布局

主题、分区、副本回顾:

  • Kafka中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。
  • 每个主题又可以分为一个或多个分区,分区的数量可以在主题创建时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号,也就是通常所说的偏移量offset;
  • 每个分区又拥有多个副本,一个负责读写的leader副本和0或多个负责同步的follower副本;leader副本均匀的分布在多个broker上,实现负载均衡;

日志布局:

  • Kafka中一个副本对应一个日志Log
  • 为了防止Log过大,Kafka又引入了日志分段LogSegment的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理;
  • Log在物理上对应一个文件夹;
  • LogSetment是逻辑上的概念,每个LogSetment对应磁盘上的一个日志文件两个索引文件(偏移量索引和时间戳索引),以及可能的其他文件(比如,以.txnindex为后缀的事务索引文件);

主题、分区、副本和日志文件的对应关系如下:
在这里插入图片描述

  • Log对应一个命名形式为<topic>-<partition>的文件夹;每个副本对应的日志文件夹以分区号为后缀;
  • 向Log中追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入操作,在此之前的所有LogSegment都不能写入数据;最后一个LogSegment称为activeSegment
  • 为了便于消息的检索,每个LogSegment中的日志文件(以.log为文件后缀)都有对应的两个索引文件:偏移量索引文件(以.index为文件后缀)和时间戳索引文件(以timeindex为文件后缀);
  • 每个LogSegment都有一个基准偏移量baseOffset,用来表示当前LogSegment中第一条消息的offset,偏移量是一个64位的长整数,日志文件和两个索引文件都是根据偏移量命名的;
  • 在创建主题的时候,如果当前broker中不止配置了一个根目录,那么会挑选分区数最少的根目录来完成本次创建任务;

2. 日志索引

两个索引文件

  • 每个日志分段文件对应了两个索引文件(偏移量索引文件和时间戳索引文件),主要用来提高查找消息的效率;
  • 偏移量索引文件:用来建立消息偏移量offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;
  • 时间戳索引文件:根据指定的时间戳(timestamp)来查找对应的偏移量信息;

稀疏索引

  • Kafka中的索引文件以稀疏矩阵sparse index)的方式构造消息的索引,并不保证每个消息在索引文件中都有对应的索引项;
  • 每当写入一定量(由broker端参数log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项时间戳索引项
  • 增大或减小log.index.interval.bytes的值,对应的可以缩小或增加索引项的密度;
  • 稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度;
  • 偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量位置;如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量;然后去日志文件中从返回的偏移量位置开始顺序查找;
  • 时间戳索引文件中的时间戳也保持单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,然后还需要根据偏移量从偏移量索引文件中进行再次定位;
  • 稀疏索引的方式是在磁盘空间内存空间查找时间等多方面的一个折中方案;

日志分段切分

  1. 日志分段达到一定的条件时需要切分,那么对应的索引文件也需要进行切分;
  2. 日志分段文件切分包含以下几个条件,满足其一即可:
  • 日志分段文件的大小超过了参数限制;参数由broker端的log.segment.bytes配置,默认值是1073741824,即1GB
  • 偏移量索引文件或时间戳索引文件的大小达到参数限制,参数由broker端的log.index.size.max.bytes配置,默认值为10485760,即10MB
  • 日志分段中消息的最大时间戳与当前系统的时间戳差值大于参数限制;参数由log.roll.mslog.roll.hours配置,如果同时配置了这两个参数,则log.roll.ms的优先级高。默认情况下只配置了log.roll.hours,值为1687天
  • 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Interger.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offset-baseOffset > Interger.MAX_VALUE);

索引文件预分配空间

  • 对非当前活跃的日志分段而言,其对应的索引文件内容已经固定而不需要再写入索引项,所以会被设定为只读
  • 而对于当前活跃的日志分段而言,索引文件还会追加更多的索引项,所以被设定为可读写
  • 在索引文件切分的时候,Kafka会关闭当前正在写入的索引文件并设置为只读模式;同时以可读写的模式创建新的索引文件,索引文件的大小由broker端参数log.index.size.max.bytes配置;
  • Kafka在创建索引文件的时候,会为其预分配log.index.size.max.bytes大小的空间,这点是与日志分段文件不同的;只有当索引文件进行切分的时候,Kafka才会把该索引文件裁剪到实际的数据大小;

2.1 偏移量索引文件

偏移量索引文件中,每个索引项分为两部分:relativeOffsetposition

  • relativeOffset:相对偏移量,表示消息相对于baseOffset(每个日志分段的基准偏移量,即当前LogSegment中第一条消息的offset)的偏移量,占用4字节;
  • position:物理地址,也就是消息在日志分段文件中对应的物理地址,占用4字节;

查找指定偏移量(offset)消息的过程:(假如要查找偏移量为274的消息)

  • 首先根据offset(消息偏移量)和每个日志分段的baseOffset(基准偏移量)定位所在的日志分段;假设定位到baseOffset251的日志分段;
  • 根据offsetbaseOffset计算relativeOffset(相对偏移量),根据以下公式,计算relativeOffset23
relativeOffset = offset - baseOffset
  • 偏移量索引文件中找到不大于relativeOffset的最大索引项,假设找到[22,656];
  • 然后从日志分段文件中的物理位置656开始顺序查找偏移量为23的消息;

说明:

  • 根据offset查找baseOffset的日志分段文件不是顺序查找,而是使用跳跃表结构(跳跃表结构可以参考redis笔记,是一种跳跃式的顺序查找),Kafka的每个日志对象中都使用了ConcurremtSkipListMap来保存各个日志分段,每个日志分段的baseOffset作为key,这样可以根据指定偏移量快速定位到消息所在的日志分段;
  • 根据relativeOffset偏移量索引文件中查找最大索引项使用的是二分查找,查找到最大索引项后从日志分段文件中从最大索引项开始顺序查找偏移量为relativeOffset的消息;
  • Kafka强制要求索引文件大小必须是索引项大小的整数倍。对于偏移量索引文件而言,必须为8的整数倍(每个索引项8字节);

2.2 时间戳索引

时间戳索引的每个索引项分为两部分:timestamprelativeOffset,每个索引项占用12字节

  • timestamp:当前日志分段最大的时间戳;
  • relativeOffset:时间戳所对应的消息的相对偏移量,即偏移量索引文件中的relativeOffset;

时间戳单调递增:

  • 每个追加的时间戳索引项中的timestamp必须大于之前追加的索引项的,否则不予追加;

查找指定时间戳(targetTimeStamp)开始的消息的过程:

  • 找到所对应的日志分段:将targetTimestamp和每个日志分段中的最大时间戳largetTimeStamp逐一对比(这里不能使用跳跃表),直到找到不小于targetTimeStamplargetTimeStamp所对应的日志分段;
  • 找到相应的日志分段后,在时间戳索引文件中使用二分查找法查找到不大于targetTimeStamp的最大索引项,获取对应的相对偏移量relativeOffset
  • 在偏移量索引文件中使用二分查找法找到不大于relativeOffset的最大索引项,获取对应的物理地址position
  • 在日志分段中从position处开始顺序查找;

说明:

  • Kafka强制要求索引文件大小必须是索引项大小的整数倍。对于时间戳索引文件而言,必须是12的整数倍(时间戳索引文件的每个索引项为12字节);
  • 时间戳索引文件记录的是时间戳和相对偏移量,找到相对偏移量后还需要去偏移量索引文件中查找不大于偏移量的最大索引项;然后再去日志分段文件中顺序查找;

3. 日志清理

Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加,就需要对消息做一定的清理操作。Kafka中每一个分区副本都对应一个Log,而Log又可以分为多个日志分段,这样也便于日志的清理操作。Kafka提供了两种日志清理策略:

  • 日志删除:按照一定的保留策略直接删除不符合条件的日志分段;
  • 日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本;

设置日志清理策略:

  • 可以通过broker端的参数log.cleanup.policy来设置日志清理策略,此参数的默认值为delete,即采用日志删除的清理策略;
  • 如果要采用日志压缩的清理策略,就需要将log.cleanup.policy设置为compact,并且将log.cleaner.enable设定为true(默认值就是true);
  • 还可以同时支持日志删除日志压缩两种策略,将log.cleanup.policy参数设置为delete,compact
  • 日志清理的粒度还可以控制到主题级别,比如与broker的log.cleanup.policy对应的主题级别的参数为cleanup.policy

3.1 日志删除

日志删除是按照一定的保留策略直接删除不符合条件的日志分段。在Kfaka的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件。检测周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300000,即5分钟

当前日志分段的保留策略有三种:

  • 基于时间的保留策略
  • 基于日志大小的保留策略
  • 基于日志起始偏移量的保留策略

基于时间的保留策略

  • 日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值来寻找可删除的日志分段文件集合;
  • 设定的阈值可以通过broker端的log.retention.hourslog.retention.minuteslog.retention.ms来配置,同时配置时,时间单位越小优先级越高;默认情况下只配置了log.retention.hours,值为1687天,即默认情况下日志分段文件的保留时间为7天
  • 查找过期的日志分段文件是根据日志分段中最大的时间戳来计算的;
  • 日志文件中必须要保证有一个活跃的日志分段activeSegment,若待删除的日志分段的总数等于该日志文件中所有的日志分段数量,那么说明所有的日志分段都已经过期,这是,会先切出一个新的日志分段作为activeSegment,然后执行删除操作;

日志删除操作:

  • 删除日志分段时,首先会从Log对象所维护的日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。
  • 然后将日志分段文件所对应的所有文件添加上.deleted的后缀(当然也包括所对应的索引文件)。
  • 最后交由一个以delete-file命名的延迟任务来删除这些以.deleted为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来调配,此参数的默认值为60000,即1分钟

基于日志大小的保留策略

  • 日志删除任务会检查当前日志的大小是否超过设定的阈值retentionSize,来寻找可删除的日志分段的文件集合;
  • 设定的阈值由broker端的参数log.retention.bytes来配置,默认值为-1即不限制大小;该参配置的是Log中所有日志文件的总大小,而不是单个日志分段文件的大小,单个日志分段文件(.log文件)的大小由broker端参数log.segment.bytes来限制,默认值为10737418241GB;

日志删除操作:

  • 首先计算日志文件的总大小size和设定的保留阈值retentionSize的差值diff,即为需要删除的日志总大小;
  • 然后从日志文件中第一个日志分段开始进行查找可删除的日志分段的文件集合;
  • 找到可删除的日志分段集合后执行删除操作,删除步骤和基于时间的保留策略的删除操作相同;

基于日志起始偏移量

  • 基于日志起始偏移量的保留策略的判断依据是:某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是,则可以删除此日志分段;
  • 收集完可删除的日志分段的文件集合之后,删除操作同基于时间的保留策略相同;

3.2 日志压缩

  • 日志压缩(Log Compaction)是指对于相同key的不同value值,只保留最后一个版本,是Kafka在默认的日志删除(Log Retention)规则之外提供的一种清理过时数据的方式;
  • Kafka还有一个消息压缩(Message Compression)的概念,消息压缩是指Kafka在消息从生产者传输到broker再到消费者过程中,将多条消息压缩传输;
  • 为了避免当前活跃的日志分段activeSegment成为热点文件,activeSegment不会参与Log Compaction的执行;
  • 墓碑消息tombstone):如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息;墓碑消息用于日志清理时使用;
  • 日志压缩执行过后的日志分段的大小会被原先的日志分段要小,为了防止出现太多的小文件,Kafka在实际执行清理过程中并不对单个的日志分段进行单独清理,而是将日志分段文件中offset0firstUncleanableOffset的所有日志分段进行分组,每个日志分段只属于一组,分组的策略为:按照日志分段的顺序遍历,每组中日志分段的占用空间大小之和不超过segmentSize(可通过broker端参数log.segment.bytes设置,默认值为1GB),且对应的索引文件占用大小之和不超过maxIndexSize(可以通过broker端参数log.index.size.max.bytes设置,默认值为10MB)。同一组的多个日志分段清理过后,只会生成一个新的日志分段;

4. 磁盘存储

  • Kafka依赖于文件系统(更底层的来说就是磁盘)来存储和缓存消息;这就让我们怀疑Kafka采用这种持久化形式能否提供有竞争力的性能;
  • 事实上,磁盘可以比我们预想的要快,也可能比我们预想的要慢,这完全取决去我们如何使用它;

Kafka使用以下技术来提示性能:

  • 消息顺序追加
  • 页缓存技术
  • 零拷贝技术

4.1 消息顺序追加

  • 内存和磁盘的写入分为:线性写入(也称为顺序写入)和随机写入;
  • 有关测试结果表明,顺序写内存的速度最快,其次是顺序写磁盘,然后是随机写内存,最慢的是随机写磁盘;由此可见,顺序写磁盘的速度不仅比随机写磁盘速度快,而且也比随机写内存的速度快;
  • Kafka在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且不允许修改已写入的消息,这种方式属于典型的顺序写盘操作;所以即使Kafka使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑;

4.2 页缓存

  • 页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘I/O的操作;
  • 当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页是否在页缓存中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的I/O操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程;
  • 当一个进程需要将数据写入磁盘时,操作系统也会检测数据对应的也是否在页缓存中,如果不存在,则会先在页缓存中添加响应的页,最后将数据写入对应的页;被修改过的页就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性;

进程缓存与页缓存

  • 对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次;除非使用Direct I/O的方式,否则页缓存很难被禁止;
  • 在Java中对象的内存开销非常大,通常回收真实数据大小的几倍甚至更多,空间使用率低下;同时Java的垃圾回收会随着堆内数据的增多而变得越来越慢;
  • 所以,使用文件系统并依赖页缓存的做法明显要优于维护一个进程内缓存或其他结构,至少我们可以省去了一份进程内部的缓存消耗,同时还可以通过结构紧凑的字节码来替代使用对象的方式以节省更多的空间;
  • 并且,即使Kafka服务重启(主机不重启),页缓存还是会保持有效,然而进程内的缓存却需要重建;
  • Kafka中大量使用了页缓存,消息先被写入页缓存,然后由操作系统负责具体的刷盘任务,同时Kafka中还提供了同步刷盘及间断性强制刷盘的功能(fsync);
  • 同步刷盘可以提高消息的可靠性,但是会严重影响性能,消息的可靠性应该由多副本机制来保障,刷盘任务应该交由操作系统去调配;

4.3 零拷贝

零拷贝:是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。

  • 零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换;

不使用零拷贝的文件读取与发送

假设要将磁盘上的文件展示给用户,需要先将文件从磁盘中复制出来放到一个内存buf中,然后将这个buf通过套接字(socket)传输给用户。这个过程是比较低效的,文件经历了4次复制过程:

  • (1) 调用read()时,文件中的内容被复制到了内核模式下的Read Buffer中;
  • (2) CPU控制将内核模式数据复制到用户模式下;
  • (3) 调用write()时,将用户模式下的内容复制到内核模式下的Socket Buffer中;
  • (4) 将内核模式下的Socket Buffer的数据复制到网卡设备中传送;

即,直接与设备交换的是内核模式,与应用程序交互的是用户模式,数据在内核模式和用户模式之间走了一圈,浪费了2次复制过程:第一次从内核模式复制到用户模式,第二次从用户模式复制到内核模式;

零拷贝技术

  • 采用零拷贝技术,应用程序可以直接请求内核把磁盘中的数据传输给Socket
  • 零拷贝技术通过DMA(Direct Memory Access)技术,将文件内容复制到内核模式下的Read Buffer中,不过没有数据被复制到Socekt Buffer,DMB引擎直接将数据从内核模式中传递到网卡设备,这里数据只经历了2次复制,就从磁盘中传送出去了,并且上下文切换也变成了2次;

参考文献

  • 《深入理解Kafka核心设计与实践原理》 朱忠华 著,电子工业出版社.

版权声明:本文为godloveayuan原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。