nio、pageCache、mmap、sendfile、pageCache预读和顺序读写、bufferCache、磁盘块、扇区、rocketmq的实现

前言

讲一讲 nio、pageCache、mmap、sendfile、page cache预读和顺序读写、bufferCache、磁盘块、rocketmq的实现

一、nio

1.1 NIO概念

Java NIO(New IO)是从Java 1.4版本开始引入的 一个新的IO API,可以替代标准的Java IO API。 NIO与原来的IO有同样的作用和目的,但是使用 的方式完全不同,NIO支持面向缓冲区的、基于 通道的IO操作。NIO将以更加高效的方式进行文 件的读写操作。

ionio
面向流(Stream Oriented)面向缓冲区(Buffer Oriented)
阻塞IO(Blocking IO)非阻塞IO(Non Blocking IO)

1.2 缓冲区(其实就是数组)

Java NIO系统的核心在于:通道(Channel)和缓冲区 (Buffer)。通道表示打开到 IO 设备(例如:文件、 套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。
缓冲区类型
Buffer 底层维护了一个数组,可以保存多个相同类型的数据。根据数据类型不同(boolean 除外) ,有以下 Buffer 常用子类:
在这里插入图片描述
在这里插入图片描述
缓冲区基本属性
容量 (capacity) :表示 Buffer 最大数据容量,缓冲区容量不能为负,并且创建后不能更改。

限制 (limit):第一个不应该读取或写入的数据的索引,即位于 limit 后的数据 不可读写。缓冲区的限制不能为负,并且不能大于其容量。

位置 (position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制。

标记 (mark)与重置 (reset):标记是一个索引,通过 Buffer 中的 mark() 方法 指定 Buffer 中一个特定的 position,之后可以通过调用 reset() 方法恢复到这 个 position。

1.3 通道

通道(Channel):由 java.nio.channels包定义的。Channel 表示 IO 源与目标打开的连接,如socketChannel代表TCP连接,DatagramChannel代表UDP连接。 Channel 类似于传统的“流”。只不过 Channel 本身不能直接访问数据,Channel 只能与 Buffer 进行交互。

通道类型
Java 为 Channel 接口提供的最主要实现类如下:
FileChannel:用于读取、写入、映射和操作文件的通道。
DatagramChannel:通过 UDP 读写网络中的数据通道。
SocketChannel:通过 TCP 读写网络中的数据。
ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
在这里插入图片描述
在这里插入图片描述

1.4 非阻塞式网络通信

传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write() 时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不 能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理, 当服务器端需要处理大量客户端时,性能急剧下降。

Java NIO是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同 时处理连接到服务器端的所有客户端。

Selector
选择器(Selector) 是 SelectableChannle 对象的多路复用器,Selector可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector 可使一个单独的线程管理多个 Channel。Selector 是非阻塞 IO 的核心。

// 创建一个socket套接字
Socket socket = new Socket(InetAddress.getByName("127.0.0.1"), 9898)
// 获取SocketChannel
SocketChannel channel = socket.getChannel();
// 创建选择器
Selector selector = Selector.open();
// 将SelectorChannel切换到非阻塞模式
channel.configureBlocking(false);
// 想selector注册channel
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

  • 不管磁盘io 还是网络io我们都可以用到nio的非阻塞和缓冲buffer

二、pageCache

2.1 文件

从应用程序的角度看,操作系统提供了一个统一的虚拟机,在该虚拟机中没有各种机器的具体细节,只有进程、文件、地址空间以及进程间通信等逻辑概念。这种抽象虚拟机使得应用程序的开发变得相对容易。对于存储设备上的数据,操作系统向应用程序提供的逻辑概念就是"文件"。应用程序要存储或访问数据时,只需读或者写"文件"的一维地址空间即可,而这个地址空间与存储设备上存储块之间的对应关系则由操作系统维护。说白了,文件就是基于内核态Page Cache的一层抽象。

2.2 Page Cache的作用

在这里插入图片描述
图中描述了 Linux 操作系统中文件 Cache 管理与内存管理以及文件系统的关系示意图。从图中可以看到,在 Linux 中,具体文件系统,如 ext2/ext3、jfs、ntfs 等,负责在文件 Cache和存储设备之间交换数据,位于具体文件系统之上的虚拟文件系统VFS负责在应用程序和文件 Cache 之间通过 read/write 等接口交换数据,而内存管理系统负责文件 Cache 的分配和回收,同时虚拟内存管理系统(VMM)则允许应用程序和文件 Cache 之间通过 memory map的方式交换数据。可见,在 Linux 系统中,文件 Cache 是内存管理系统、文件系统以及应用程序之间的一个联系枢纽。

2.3 Page Cache相关的数据结构

每一个 Page Cache 包含若干 Buffer Cache。

1、内存管理系统与Page Cache交互,负责维护每项 Page Cache 的分配和回收,同时在使用 memory map 方式访问时负责建立映射;
2、VFS 与Page Cache交互,负责 Page Cache 与用户空间的数据交换,即文件读写;
3、具体文件系统则一般只与 Buffer Cache 交互,它们负责在外围存储设备和 Buffer Cache 之间交换数据。

Page Cache、Buffer Cache(磁盘快缓冲减少磁盘io)、文件以及磁盘之间的关系如图所示
在这里插入图片描述

  • Linux内核中与Page Cache操作相关的API有很多,按其使用方式可以分成两类:一类是以拷贝方式操作的相关接口, 如read/write/sendfile(Java中transferTo()方法实现)等;另一类是以地址映射方式操作的相关接口,如mmap(Java中FileChannel的MappedByteBuffer map(int mode,long position,long size)实现)。其中sendfile和mmap都是零拷贝的实现方案。

三、mmap和sendfile

3.1 常规文件读写

我们先看下正常文件读写所经历的阶段,即FileChannel#read,FileChannel#write,共涉及四次上下文切换(内核态和用户态的切换,包括read调用,read返回,write调用,write返回)和四次数据拷贝
在这里插入图片描述
常规文件读写示例:

    @Test
    public void testChannel() throws IOException {
        FileInputStream inputStream = null;
        FileOutputStream outputStream = null;
        FileChannel inputChannel = null;
        FileChannel outputChannel = null;
        try {
            inputStream = new FileInputStream(new File("/Users/djg/Downloads/branch.jpg"));
            outputStream = new FileOutputStream(new File("/Users/djg/Downloads/branch2.jpg"));

            // 获取通道
            inputChannel = inputStream.getChannel();
            outputChannel = outputStream.getChannel();

            // 分配缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            // 将通道中数据存入缓冲区
            while(inputChannel.read(byteBuffer) != -1){
                // 切换成读取数据的模式
                byteBuffer.flip();
                //缓冲区中数据写到通道中区
                outputChannel.write(byteBuffer);
                // 清空缓冲区
                byteBuffer.clear();
            }
            System.out.println("读写成功");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            inputChannel.close();
            outputChannel.close();
            System.out.println("数据关闭成功");
        }
    }

3.2 mmap(内存映射)

直接内存映射
Linux提供的mmap系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间;同样地, 内核空间对这段区域的修改也直接反映用户空间。正因为有这样的映射关系, 就不需要在用户态(User-space)与内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率,这就是内存直接映射技术。
在这里插入图片描述
NIO中一个重要的类:MappedByteBuffer——java nio引入的文件内存映射方案,读写性能极高。MappedByteBuffer将文件直接映射到内存。可以映射整个文件,如果文件比较大的话可以考虑分段进行映射,只要指定文件的感兴趣部分就可以。

由于MappedByteBuffer申请的是直接内存,因此不受Minor GC控制,只能在发生Full GC时才能被回收,因此Java提供了DirectByteBuffer类来改善这一情况。它是MappedByteBuffer类的子类,同时它实现了DirectBuffer接口,维护一个Cleaner对象来完成内存回收。因此它既可以通过Full GC来回收内存,也可以调用clean()方法来进行回收

#include <sys/mman.h>
void *mmap(void *start, size_t length, int prot, int flags, int fd, off_t offset);

FileChannel提供了map方法来把文件映射为内存对象:

可以把文件的从position开始的size大小的区域映射为内存对象,mode指出了 可访问该内存映像文件的方式

  • READ_ONLY,(只读):
    试图修改得到的缓冲区将导致抛出ReadOnlyBufferException.(MapMode.READ_ONLY)
  • READ_WRITE(读/写): 对得到的缓冲区的更改最终将传播到文件;该更改对映射到同一文件的其他程序不一定是可见的。
    (MapMode.READ_WRITE)
  • PRIVATE(专用):
    对得到的缓冲区的更改不会传播到文件,并且该更改对映射到同一文件的其他程序也不是可见的;相反,会创建缓冲区已修改部分的专用副本。
    (MapMode.PRIVATE)

mmap映射(rocketmq使用的技术下面会分析)

static final int BUFFER_SIZE = 1024;

/**
 * 使用直接内存映射读取文件
 * @param file
 */
public static void fileReadWithMmap(File file) {

    long begin = System.currentTimeMillis();
    byte[] b = new byte[BUFFER_SIZE];
    int len = (int) file.length();
    MappedByteBuffer buff;
    try (FileChannel channel = new FileInputStream(file).getChannel()) {
        // 将文件所有字节映射到内存中。返回MappedByteBuffer
        buff = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
        for (int offset = 0; offset < len; offset += BUFFER_SIZE) {
            if (len - offset > BUFFER_SIZE) {
                buff.get(b);
            } else {
                buff.get(new byte[len - offset]);
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    long end = System.currentTimeMillis();
    System.out.println("time is:" + (end - begin));
}

3.3 sendfile

从Linux 2.1版内核开始,Linux引入了sendfile,也能减少一次拷贝
在这里插入图片描述

#include<sys/sendfile.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

在java中可以通过transferFrom和transferTo实现

/**
 * filechannel进行文件复制(零拷贝)
 *
 * @param fromFile 源文件
 * @param toFile   目标文件
 */
public static void fileCopyWithFileChannel(File fromFile, File toFile) {
    try (// 得到fileInputStream的文件通道
         FileChannel fileChannelInput = new FileInputStream(fromFile).getChannel();
         // 得到fileOutputStream的文件通道
         FileChannel fileChannelOutput = new FileOutputStream(toFile).getChannel()) {

        //将fileChannelInput通道的数据,写入到fileChannelOutput通道
        fileChannelInput.transferTo(0, fileChannelInput.size(), fileChannelOutput);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static final int BUFFER_SIZE = 1024;
/**
 * BufferedInputStream进行文件复制(用作对比实验)
 *
 * @param fromFile 源文件
 * @param toFile   目标文件
 */
public static void bufferedCopy(File fromFile,File toFile) throws IOException {
    try(BufferedInputStream bis = new BufferedInputStream(new FileInputStream(fromFile));
        BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(toFile))){
        byte[] buf = new byte[BUFFER_SIZE];
        while ((bis.read(buf)) != -1) {
            bos.write(buf);
        }
    }
}

四、pageCache预读和顺序读写

4.1 预读和顺序读写

我们时常听到顺序读写比随机读写更高效的论断,那么什么是顺序读写?要想搞清楚顺序读写,我们首先要掌握文件的预读机制,它是一种将磁盘块预读到page cache的机制。

Linux内核中文件预读算法的具体过程是这样的:对于每个文件的第一个读请求,系统读入所请求的页面并读入紧随其后的少数几个页面(不少于一个页面,通常是三个页面),这时的预读称为同步预读。对于第二次读请求,如果所读页面不在Cache中,即不在前次预读的group中,则表明文件访问不是顺序访问,系统继续采用同步预读;如果所读页面在Cache中,则表明前次预读命中,操作系统把预读group扩大一倍,并让底层文件系统读入group中剩下尚不在Cache中的文件数据块,这时的预读称为异步预读。无论第二次读请求是否命中,系统都要更新当前预读group的大小。此外,系统中定义了一个window,它包括前一次预读的group和本次预读的group。任何接下来的读请求都会处于两种情况之一:第一种情况是所请求的页面处于预读window中,这时继续进行异步预读并更新相应的window和group;第二种情况是所请求的页面处于预读window之外,这时系统就要进行同步预读并重置相应的window和group。如下是Linux内核预读机制的一个示意图,其中a是某次读操作之前的情况,b是读操作所请求页面不在window中的情况,而c是读操作所请求页面在window中的情况
在这里插入图片描述
下面说说顺序读为什么比随机读要快
在这里插入图片描述
以顺序读为例,当用户发起一个常规文件 fileChannel.read(4kb) 之后,实际发生了两件事(如果是使用mmap技术就不会有第二个操作)

  • 操作系统从磁盘加载了 16kb 进入 PageCache,这被称为预读
  • 操作通从 PageCache 拷贝 4kb 进入用户内存

最终我们在用户内存访问到了 4kb,为什么顺序读快?很容量想到,当用户继续访问接下来的 [4kb,16kb] 的磁盘内容时,便是直接从 PageCache 去访问了。试想一下,当需要访问 16kb 的磁盘内容时,是发生 4 次磁盘 IO 快,还是发生 1 次磁盘 IO+4 次内存 IO 快呢?答案是显而易见的,这一切都是 PageCache 带来的优化

  • 顺序写也类似,rocketmq就是使用了顺序写入每次写入4page,一个page 4kb数据,当然rocketmq还有其他条件刷盘,比如异步10秒内刷盘flushPhysicQueueThoroughInterval:彻底刷盘时间间隔,默认10s (前提是异步输盘,同步刷盘每次都会刷不管数据大小是否满足, 还有mmap技术写入pageCache中其实不用数盘系统也会30s刷一次,这里异步10s类必须刷一次是为了数据安全,因为pageCache中的数据并没有写入磁盘,当断电了会数据丢失)

五、磁盘块和扇区

5.1 什么是扇区和扇区大小

下图显示的是一个盘面,盘面中一圈圈灰色同心圆为一条条磁道,从圆心向外画直线,可以将磁道划分为若干个弧段,每个磁道上一个弧段被称之为一个扇区(图践绿色部分)。扇区是磁盘的最小组成单元,通常是512字节。(由于不断提高磁盘的大小,部分厂商设定每个扇区的大小是4096字节)

在这里插入图片描述
可以通过fdisk -l命令查看扇区大小
在这里插入图片描述

  • 可以看出在我这是512byte
  • 操作系统操作磁盘,也需要通过磁盘驱动器进行。所以离不开扇区的。
  • 扇区是真实的东西。磁盘驱动器操作磁盘数据,每次都按照扇为最小单位操作。

5.2 什么是磁盘块和磁盘块大小

操作系统与磁盘之间交流的最小单位就是磁盘块,它是一个虚拟的概念。是对于操作系统(软件)来说有意义的概念。

由于扇区的数量比较小,数目众多在寻址时比较困难,所以操作系统就将相邻的扇区组合在一起,形成一个块,再对块进行整体的操作。

操作系统忽略对底层物理存储结构的设计。通过虚拟出来磁盘块的概念,在系统中认为块是最小的单位。
可以通过stat /boot/|grep “IO Block” 查看
在这里插入图片描述

  • 可以看出是4kb

磁盘块与扇区的大小关系
既然磁盘块是一个虚拟概念。是操作系统自己"杜撰"的。软件的概念,不是真实的。所以大小由操作系统决定,操作系统可以配置一个块多大。一个块大小=一个扇区大小*2的n次方。N是可以修改的。

块与页的关系
操作系统经常与内存和硬盘这两种存储设备进行通信,类似于“块”的概念,都需要一种虚拟的基本单位。所以,与内存操作,是虚拟一个页的概念来作为最小单位。与硬盘打交道,就是以块为最小单位。

六、rocketMq的pageCache和mmap实现

6.1 首先看看rocketMq的代码结构

有几个主要的模块:

  • broker (broker服务,与生产者消费者和nameserve 建立tcp连接)
  • namesrv (服务命名类似zookpeer)
  • remoting (封装了netty的实现, 提供server和client供broker和namesrv使用)
  • store(数据存储模块,CommitLog ConsumeQueue IndexFile的存储实现)

在这里插入图片描述

  • 本次只介绍消息存储相关

6.2 消息存储接口

  • store模块提供给broker的消息存储接口主要有 putMessage() getMessage() flush() load()忽略参数

在这里插入图片描述
存储消息

 /**
     * Store a message into store.
     *
     * @param msg Message instance to store
     * @return result of store operation.
     */
    PutMessageResult putMessage(final MessageExtBrokerInner msg);

    /**
     * Store a batch of messages.
     *
     * @param messageExtBatch Message batch.
     * @return result of storing batch messages.
     */
    PutMessageResult putMessages(final MessageExtBatch messageExtBatch);

获取消息

/**
     * Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
     * from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
     *
     * @param group Consumer group that launches this query.
     * @param topic Topic to query.
     * @param queueId Queue ID to query.
     * @param offset Logical offset to start from.
     * @param maxMsgNums Maximum count of messages to query.
     * @param messageFilter Message filter used to screen desired messages.
     * @return Matched messages.
     * 从给定偏移量开始,在queueId处查询属于主题的大多数maxMsgNums消息。结果消息将使用提供的消息过滤器进一步进行筛选。
     */
    GetMessageResult getMessage(final String group, final String topic, final int queueId,
        final long offset, final int maxMsgNums, final MessageFilter messageFilter);

DefaultMessageStore

    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
//        存储服务不可用
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }

//        broker角色不是master
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }

//        没有写权限
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        } else {
            this.printTimes.set(0);
        }

//        topic长度不合法
        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }

//        消息属性长度不合法
        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        }

//        系统繁忙
        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }

        long beginTime = this.getSystemClock().now();
//        commitLog存储消息=》
        PutMessageResult result = this.commitLog.putMessage(msg);

        long eclipseTime = this.getSystemClock().now() - beginTime;
        if (eclipseTime > 500) {
            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }
  • 前面都是检查 最后 PutMessageResult result = this.commitLog.putMessage(msg);
    通过commitLog对象去存储

CommitLog

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // crc32 类似 md5
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

//        从消息中获取topic
        String topic = msg.getTopic();
//        从消息中获取queueId
        int queueId = msg.getQueueId();

//        获取事务类型
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//        如果没有事务或提交事务延迟执行
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

//                设置延迟消息的topic
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId 备份真正的topic和queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
//        获取映射文件队列的最后一个映射文件
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

//        自旋锁或者互斥锁
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
//            开始锁定时间
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
    //                映射文件不存在或者映射文件满了以起始位置的offset获取最后的映射文件=》
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

//            映射文件中添加消息=》
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
//                    映射文件不存在或者映射文件满了
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message 创建一个文件读写消息
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
//                    处理消息=》
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
//                    消息过大
                case MESSAGE_SIZE_EXCEEDED:
//                    消息属性过大
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

        if (eclipseTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
//            解锁映射文件
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics 单次存储消息topic次数
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
//        单次存储消息topic大小
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

//        磁盘刷新=》
        handleDiskFlush(result, putMessageResult, msg);
//        主从刷新=》
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }

MappedFile mmap映射文件抽象成MappedFile,此时写入pageCache中去了

 public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

//        获取当前写的位置
        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result = null;
            if (messageExt instanceof MessageExtBrokerInner) {
//                消息序列化后组装映射的buffer=》
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
//                批量消息序列化后组装映射的buffer
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

磁盘刷新

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush 同步刷新
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
//                countdownLatch.await() 同步等待刷新结果,除非超时
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
//                如果异步直接解除阻塞 countdownLatch.countDown()
                service.wakeup();
            }
        }
        // Asynchronous flush 异步刷新
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }

6.3 同步刷盘

同步刷新 主线程阻塞住, 刷盘线程完了主线程可以继续使用countDownLatch.await(timeout, TimeUnit.MILLISECONDS); 超时时间5s

public static class GroupCommitRequest {
        private final long nextOffset;
        private final CountDownLatch countDownLatch = new CountDownLatch(1);
        private volatile boolean flushOK = false;

        public GroupCommitRequest(long nextOffset) {
            this.nextOffset = nextOffset;
        }

        public long getNextOffset() {
            return nextOffset;
        }

        public void wakeupCustomer(final boolean flushOK) {
            this.flushOK = flushOK;
            this.countDownLatch.countDown();
        }
//
        public boolean waitForFlush(long timeout) {
            try {
                this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
                return this.flushOK;
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
                return false;
            }
        }
    }

通过cas方式去唤醒刷盘线程, (刷盘线程GroupCommitService也有在定时刷盘broker启动的时候会start线程,这里同步方式就去唤醒),只能同时一个去
这里FlushCommitLogService继承ServiceThread, ServiceThread实现Runnable

   class GroupCommitService extends FlushCommitLogService {
        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

        public synchronized void putRequest(final GroupCommitRequest request) {
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            // hasNotified  是AtomicBoolean
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

        private void swapRequests() {
            List<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }
//回到doCommit方法中,这时会发现这里是对requestsRead这条List进行的操作,而刚才是将记录存放在requestsWrite这条List中的
//这就和在run方法中的waitForRunning方法有关了
        private void doCommit() {
        //volatile 类型requestsRead 在提交的时候锁住不让修改交换
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = false;
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                            if (!flushOK) {
                                CommitLog.this.mappedFileQueue.flush(0);
                            }
                        }

                        req.wakeupCustomer(flushOK);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
        }

        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                //waitPoint 计数器重置,并且阻塞住本线程(GroupCommitService实现了Runnable)
                    this.waitForRunning(10);
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            // Under normal circumstances shutdown, wait for the arrival of the
            // request, and then flush
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn("GroupCommitService Exception, ", e);
            }

            synchronized (this) {
            //交换两个list requestsWrite  requestsRead
                this.swapRequests();
            }

            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

        @Override
        protected void onWaitEnd() {
            this.swapRequests();
        }

        @Override
        public String getServiceName() {
            return GroupCommitService.class.getSimpleName();
        }

        @Override
        public long getJointime() {
            return 1000 * 60 * 5;
        }
    }

MappedFileQueue 里面CopyOnWriteArrayList维护着MappedFile

//
    public boolean flush(final int flushLeastPages) {
        boolean result = true;
//        根据offset找到映射文件=》
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
//            =》
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            //在完成MappedFile的flush后,还需要更新flushedWhere的值此时缓存中的数据完成了持久化,同步刷盘结束
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }

刷盘 执行mappedByteBuffer.force();

   public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

isAbleToFlush 根据flushLeastPages 检查是否可flush, 其中flush记录的是上一次完成刷新后的位置,write记录的是当前消息内容写入后的位置

   private boolean isAbleToFlush(final int flushLeastPages) {
        int flush = this.flushedPosition.get();
        int write = getReadPosition();

        if (this.isFull()) {
            return true;
        }
		//当flushLeastPages 大于0的时候,通过:
        if (flushLeastPages > 0) {
        // OS_PAGE_SIZE = 1024 * 4; 
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
        }

        return write > flush;
    }
  • 可以计算出是否满足page的要求,其中OS_PAGE_SIZE是4K,也就是说1个page大小是4k
  • 由于这里是同步刷盘,flushLeastPages是0,不对page要求,只要有缓存有内容就会刷盘;但是在异步刷盘中,flushLeastPages是4,也就是说,只有当缓存的消息至少是4(page个数)*4K(page大小)=16K时,异步刷盘才会将缓存写入文件(如果10s内没有达到16k也会刷盘)

6.4 异步刷盘

6.4.1 FlushRealTimeService 缓存到mappedByteBuffer -> 写入磁盘(包括同步刷盘)

class FlushRealTimeService extends FlushCommitLogService {
        private long lastFlushTimestamp = 0;
        private long printTimes = 0;

        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                //flushCommitLogTimed 是否使用定时刷盘
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
				//interval:刷盘时间间隔,默认500ms
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                //flushPhysicQueueLeastPages:page大小,默认4个
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
				//flushPhysicQueueThoroughInterval:彻底刷盘时间间隔,默认10s
                int flushPhysicQueueThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

                boolean printFlushProgress = false;

                // Print flush progress
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                    this.lastFlushTimestamp = currentTimeMillis;
                    flushPhysicQueueLeastPages = 0;
                    printFlushProgress = (printTimes++ % 10) == 0;
                }

                try {
                    if (flushCommitLogTimed) {
                        Thread.sleep(interval);
                    } else {
                        this.waitForRunning(interval);
                    }

                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }

                    long begin = System.currentTimeMillis();
					//和同步刷盘一样flushPhysicQueueLeastPages不是0 而是4了
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }

            // Normal shutdown, to ensure that all the flush before exit
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.flush(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }

            this.printFlushProgress();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

        @Override
        public String getServiceName() {
            return FlushRealTimeService.class.getSimpleName();
        }

        private void printFlushProgress() {
            // CommitLog.log.info("how much disk fall behind memory, "
            // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
        }

        @Override
        public long getJointime() {
            return 1000 * 60 * 5;
        }
    }
  • 首先根据lastFlushTimestamp(上一次刷盘时间)+
    flushPhysicQueueThoroughInterval和当前时间比较,判断是否需要进行一次彻底刷盘,若达到了需要则将flushPhysicQueueLeastPages置为0
  • 接着根据flushCommitLogTimed判断 当flushCommitLogTimed为true,使用sleep等待500ms
    当flushCommitLogTimed为false,调用waitForRunning在超时时间为500ms下阻塞,其唤醒条件也就是在handleDiskFlush中的wakeup唤醒
  • 最后,和同步刷盘一样,调用mappedFileQueue的flush方法
    只不过,这里的flushPhysicQueueLeastPages决定了其是进行彻底刷新,还是按4page(16K)的标准刷新

6.4.2 CommitRealTimeService 存到writeBuffer -> 缓存到fileChannel -> 写入磁盘

abstract class FlushCommitLogService extends ServiceThread {
        protected static final int RETRY_TIMES_OVER = 10;
    }

    class CommitRealTimeService extends FlushCommitLogService {

        private long lastCommitTimestamp = 0;

        @Override
        public String getServiceName() {
            return CommitRealTimeService.class.getSimpleName();
        }

        @Override
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
//                刷新commitLog频次200ms
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

//                提交数据页数4
                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

//                提交commitLog最大频次200ms
                int commitDataThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

                long begin = System.currentTimeMillis();
                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                    this.lastCommitTimestamp = begin;
                    commitDataLeastPages = 0;
                }

                try {
//                    提交数据=》
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                    long end = System.currentTimeMillis();
                    if (!result) {
                        this.lastCommitTimestamp = end; // result = false means some data committed.
                        //now wake up flush thread.
                        flushCommitLogService.wakeup();
                    }

                    if (end - begin > 500) {
                        log.info("Commit data to file costs {} ms", end - begin);
                    }
                    this.waitForRunning(interval);
                } catch (Throwable e) {
                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
                }
            }

            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            CommitLog.log.info(this.getServiceName() + " service end");
        }
    }
  public int commit(final int commitLeastPages) {
        if (writeBuffer == null) {
            //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
            return this.wrotePosition.get();
        }
        if (this.isAbleToCommit(commitLeastPages)) {
        // 先hold住 AtomicLong(1),同时只能一个线程
            if (this.hold()) {
                commit0(commitLeastPages);
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }

        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }

        return this.committedPosition.get();
    }

可以看出是fileChannel模式刷盘

 protected void commit0(final int commitLeastPages) {
        int writePos = this.wrotePosition.get();
        int lastCommittedPosition = this.committedPosition.get();

        if (writePos - this.committedPosition.get() > 0) {
            try {
                ByteBuffer byteBuffer = writeBuffer.slice();
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                this.fileChannel.write(byteBuffer);
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }
  • 异步刷盘有两种方式默认是mappedByteBuffer

总结


版权声明:本文为qq_32655207原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。