RocketMQ源码解析之消息存储

前言

在RocketMQ读写消息时,用的是Java NIO类库中的类,关于这些类的介绍和使用,可以先看我的前一篇文章Java传统IO和NIO有什么区别。接下来就从源码来分析当broker接收到消息时是如何处理的。

一、消息处理入口

消息处理入口:BrokerController.registerProcessor(),Broker为每一种请求的Code都注册了一个处理类,其中用于处理接收消息的类为SendMessageProcessor

        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);

当消息到达时,会调用SendMessageProcessor.processRequest()

    public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:// 默认走这里
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                if (requestHeader.isBatch()) {
                	// 处理批消息
                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                	// 处理普通消息
                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                }

                this.executeSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }

接下来就来看看处理普通消息的过程,即sendMessage()方法。大致可以分为两步:

  1. 根据接收到的消息,封装成一个MessageExtBrokerInner对象
  2. 调用MessageStore.putMessage()方法
    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                        final RemotingCommand request,
                                        final SendMessageContext sendMessageContext,
                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
                                        
        // 部分代码省略


        final byte[] body = request.getBody();

        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
        }

		// 第一步:从请求中取出消息的相关内容,并封装成一个MessageExtBrokerInner对象。
		// 后续在broker内的消息处理都会使用这个MessageExtBrokerInner对象
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return response;
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        PutMessageResult putMessageResult = null;
        Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (traFlag != null && Boolean.parseBoolean(traFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
                return response;
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
        	// 第二步:普通消息的处理,执行MessageStore的putMessage方法
        	// 传入的参数就是上面构建的MessageExtBrokerInner对象
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }

        return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

    }

接下来重点就是这个MessageStore.putMessage()方法啦,再看这个方法之前,我们先来了解一下接下来会涉及到的各种结构,对后面理解代码会很有帮助。

二、CommitLog结构

在这里插入图片描述

在这里插入图片描述
结合上面这两张图,首先,每个broker都有一个MessageStoreMessageStore是一个接口,它的实现类为DefaultMessageStoreDefaultMessageStore中有一个成员变量CommitLog,而CommitLog中又有一个MappedFileQueueMappedFileQueue中保存了一个MappedFile的List,其中每一个MappedFile都对应了磁盘上的一个文件,这些文件才是真正存储消息的地方。

可以这么理解,从我们用户的角度来看,所有消息都好像被保存到了一个很大的文件CommitLog上,而实际上RocketMQ在实现的时候把这个CommitLog拆成了一个个小文件MappedFile,而这些MappedFile都被一个MappedFileQueue所管理。

上面第二张图中CommitLog中那些数字代表什么?
在RocketMQ中,每个MappedFile默认大小都是1G,每个MappedFile的文件名都是一个20位的数字,如“00000000001073741824”,如果把左边的0都去掉,即"1073741824",这个数字就是当前MappedFile中第一个byte在整个CommitLog中的位置,也就是偏移量。假设给定一个CommitLog偏移量为1073741830,那么我们很容易就可以知道,这个数据一定是在第二个MappedFile中,并且在第二个MappedFile中的相对偏移量为1073741830 - 1073741824 = 6

当一个MappedFile写满以后,就会创建一个新的MappedFile然后往这个新的MappedFile写。

好了,接下来我们再回到上面的那个代码中的第二步,也就是创建了MessageExtBrokerInner对象以后调用的MessageStore.putMessage()方法。这里大家其实应该能猜到后面要干什么了吧,因为最终目的是要往MappedFile里面写数据, 那MessageStore.putMessage()肯定得先调用CommitLog的某个put方法,那CommitLog肯定会去MappedFileQueue找一个MappedFile,然后调用MappedFile的某个put方法。下面看实际代码

三、CommitLog处理消息写入主流程

代码DefaultMessageStore.putMessage()

    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    	
    	// 此处省略了broker状态和消息校验的代码
    	
        long beginTime = this.getSystemClock().now();
        // 果然调用了commitLog的putMessage方法,接下来看这个方法
        PutMessageResult result = this.commitLog.putMessage(msg);

        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }

代码:CommitLog.putMessage()

    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

		// 一个数据统计的服务,不用管
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        // 处理事务消息,先不管
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setBornHostV6Flag();
        }

        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag();
        }

        long eclipsedTimeInLock = 0;
		
		// *************************  主流程  *************************
		// 下面开始是主要内容
        MappedFile unlockMappedFile = null;
        // 1.从MappedFileQueue中获取一个MappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

		// 加锁
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
            	// 2. 上一步获取的MappedFile为空或者满了,就要创建一个新的MappedFile
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
			
			// 3.调用MappedFile的appendMessage来写入数据
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

        if (eclipsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipsedTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

		// 4.消息刷盘
        handleDiskFlush(result, putMessageResult, msg);
        // 涉及主从同步,先不管
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }

3.1 主流程第一步

Commit.putMessage()中,存放消息的主流程大致可以分为四步:

  1. MappedFileQueue中获取一个MappedFile
  2. 第一步获取的MappedFile为空或者满了,就要创建一个新的MappedFile,否则不需要第二步
  3. 调用MappedFile.appendMessage()方法将数据写入MappedFile
  4. 消息刷盘

接下来我们详细分析前三步,消息刷盘后面再完成。

主流程第一步

  1. 从MappedFileQueue中获取一个MappedFile
  2. 第一步获取的MappedFile为空或者满了,就要创建一个新的MappedFile,否则不需要第二步
  3. 调用MappedFile.appendMessage()方法将数据写入MappedFile

代码:MappedFileQueue.getLastMappedFile()

    public MappedFile getLastMappedFile() {
        MappedFile mappedFileLast = null;

        while (!this.mappedFiles.isEmpty()) {
            try {
            	// 直接获取List中的最后一个MappedFile
                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
                break;
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getLastMappedFile has exception.", e);
                break;
            }
        }
        return mappedFileLast;
    }

这个方法很简单,直接返回了MappedFileQueue中保存的最后一个MappedFile。为什么是最后一个?前面说到过,MappedFile默认大小为1G,写满就创建一个新的加到MappedFileQueue里面,因此在MappedFileQueue中,只有最后一个MappedFile有可能还没写满,前面的肯定已经写满了,因此直接获取最后一个就可以了。
还有一个问题,MappedFileQueue里的MappedFile是怎么来的呢?如果是一台全新的服务器,那肯定没有MappedFile,那就直接返回null,后面会创建。那如果是运行过的服务器呢?肯定会在服务器启动的时候去读这些MappedFile文件,然后初始化好这个MappedFileQueue
看一下MappedFileQueue,果然有一个load会根据文件路径去加载这些MappedFile

    public boolean load() {
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            Arrays.sort(files);
            for (File file : files) {

                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, please check it manually");
                    return false;
                }

                try {
                	// 根据文件路径创建MappedFile
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    // 加入到MappedFile的List中
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }
        return true;
    }

MappedFile.load()是什么时候调用的呢?是在CommitLog.load()中,而CommitLog.load()是在DefaultMessageStore.load()中调用,而DefaultMessageStore.load()又是在broker启动的时候调用的,现在大家应该清楚了。

3.2 主流程第二步

主流程第二步

  1. 从MappedFileQueue中获取一个MappedFile
  2. 第一步获取的MappedFile为空或者满了,就要创建一个新的MappedFile,否则不需要第二步
  3. 调用MappedFile.appendMessage()方法将数据写入MappedFile

假设第一步获取的MappedFile为空或者满了,需要创建新的MappedFile,下面来详细看看MappedFile是怎么创建的,也就是主流程注释2的那段代码

mappedFile = this.mappedFileQueue.getLastMappedFile(0);

虽然和主流程第一步一样,都是调用的MappedFileQueue.getLastMappedFile()方法,但是第二步调的是一个重载版本的,传了个参数0,该方法实际调用的是

    public MappedFile getLastMappedFile(final long startOffset) {
    	// 其中needCreate参数为true,表示需要创建新的MappedFile
        return getLastMappedFile(startOffset, true);
    }

下面详细看看这个MappedFileQueue.getLastMappedFile(final long startOffset, boolean needCreate)

    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    	// 1.看下面解释
        long createOffset = -1;
        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) {
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }
		
        if (createOffset != -1 && needCreate) {
        	// 2.看下面解释
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            // 3.看下面解释
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;

            if (this.allocateMappedFileService != null) {
            	// 4.看下面解释
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }

            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

下面按注释的序号来解释

  1. 这个createOffset是个啥?还记得上面说到的MappedFile的文件名是一串20位的数字吗,除了左侧的0以外剩下的数字就代表这个MappedFile的第一个byte在整个CommitLog中的偏移量。那现在既然要创建新的MappedFile,那它的第一个byte在CommitLog中的偏移量是不是得算出来,这样才能知道要创建的MappedFile的文件名叫什么。怎么计算呢?如果这是一台全新的服务器,从来没有过MappedFile,那文件名肯定是"00000000000000000000",如果之前创建过MappedFile呢,那也很简单,比如现在有两个MappedFile,分别是“00000000000000000000”和“00000000001073741824”(也就是1G),这两都写满了,那下一个MappedFile的文件名不就是刚才最后一个MappedFile文件名加上文件大小吗(1G),也就是1073741824(上一个MappedFile的文件名)+1073741824(文件大小)=2147483648,然后左边补0到20位,就好了。现在再看上面注释1下面的代码是不是就很清楚了。
  2. 有了createOffset,现在就可以用它把文件的全路径名组装起来了,就是MappedFile的存储路径+转换后的createOffset。就是UtilAll.offset2FileName(createOffset)方法,就是在createOffset左边补0到20位。最后得到下一个文件创建的路径,也就是nextFilePath
  3. 为啥又来一个nextNextFilePath,看名字创建一个MappedFile还不够,它要创建两个呀,没错,当要创建新的MappedFile的时候,就是会尝试去创建两个,但是第一个创建完就可以返回了,第二个在后台自己创建,能不能创建成功不关心。
  4. 真开始创建MappedFile了。

关于第4点涉及到满多东西,这里单独讲一下。这里又出现了一个新的东西,AllocateMappedFileService,这是MappedFileQueue的一个成员变量。

private final AllocateMappedFileService allocateMappedFileService;

看名字也知道这是一个分配MappedFile的服务线程,既然是服务线程,看一下它的run方法

    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped() && this.mmapOperation()) {

        }
        log.info(this.getServiceName() + " service end");
    }

就是一个死循环,不停执行这个mmapOperation()方法,这个方法稍后再看,先回到第四点:

mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);

看一下这个putRequestAndReturnMappedFile方法。看名字好像是把一个创建MappedFile的请求放到某个地方,接下来这个服务线程的mmapOperation就会把这个请求拿出来并创建一个MappedFile返回。

这个AllocateMappedFileService有几个成员变量需要注意一下

    // 创建等待时间,默认是5s
    private static int waitTimeOut = 1000 * 5;
    // 一个请求的Map,key表示的是创建的文件名,value就是AllocateRequest对象
    private ConcurrentMap<String, AllocateRequest> requestTable =
        new ConcurrentHashMap<String, AllocateRequest>();
    // 一个优先阻塞队列
    private PriorityBlockingQueue<AllocateRequest> requestQueue =
        new PriorityBlockingQueue<AllocateRequest>();

在看一眼这个AllocateRequest,它是AllocateMappedFileService的一个内部类

    static class AllocateRequest implements Comparable<AllocateRequest> {
        // Full file path
        private String filePath;
        private int fileSize;
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        // 这就是用来存放创建好的MappedFile,刚开始是null
        private volatile MappedFile mappedFile = null;
        ......
    }

OK,现在我们来梳理一下这个创建的流程,看下面这张图

在这里插入图片描述
首先根据文件的路径和大小,创建一个AllocateRequest,然后把这个AllocateRequest放到requestTablerequestQueue里面,注意,是同一个AllocateRequest引用放到了这两个结构里,对其中一个的修改也会影响到另一个。之后,putRequestAndReturnMappedFile就会调用刚才AllocateRequestcountDownLatch.await(),等待MappedFile创建完成。而上面提到了,AllocateMappedFileService中的run方法一直在循环,运行mmapOperation方法,该方法就会从requestQueue中取出一个AllocateRequest,然后根据里面的参数去创建MappedFile,并把这个新建的MappedFile放到AllocateRequest中。完成后调用该AllocateRequestcountDownLatch.countDown方法,相当于通知上面的等待过程,我已经创建好啦,你可以取走这个MappedFile了。这时,putRequestAndReturnMappedFile就会恢复执行,并从requestTable中取出这个新建的MappedFile

看一下代码:AllocateMappedFileService.putRequestAndReturnMappedFile()

    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
    	// 默认可以提交两个创建请求
        int canSubmitRequests = 2;
        // 这里和TransientStorePool有关系,这个enable默认是false,先不考虑这个,后面再分析
        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
            }
        }
		// 还记得上面说过,创建MappedFile的时候,默认会创建两个,分别是nextFilePath和nextNextFilePath
		// 创建一个AllocateRequest,这是我们第一个要创建的MappedFile
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        // 把这个AllocateRequest放到requestTable里面
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

        if (nextPutOK) {
        	// 此时是>0的,不用管这里
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextFilePath);
                return null;
            }
            // 把AllocateRequest放到requestQueue里面,可以看到,和requestTable是同一个AllocateRequest
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
            // 能提交的请求数减1
            canSubmitRequests--;
        }

		// 上面同样的操作,对nextNextFilePath,也就是第二个MappedFile执行一遍
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextNextFilePath);
            } else {
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }

        if (hasException) {
            log.warn(this.getServiceName() + " service has exception. so return null");
            return null;
        }

		// 这里从requestTable获取第一个要创建的MappedFile
        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {
            if (result != null) {
            	// 等待第一个MappedFile创建完成,默认等待5s
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                if (!waitOK) {
                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                    return null;
                } else {
                	// 从requestTable中移除这个请求
                    this.requestTable.remove(nextFilePath);
                    // 直接返回创建的第一个MappedFile
                    return result.getMappedFile();
                }
            } else {
                log.error("find preallocate mmap failed, this never happen");
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }

        return null;
    }

看完代码应该可以理解为什么只关心第一个MappedFile是否创建完成了,虽然发了两个创建请求,但是只等待第一个创建请求完成,就直接返回对应的MappedFile了,至于第二个请求,继续在后台创建。

真正创建MappedFIle是由mmapOperation()完成的,看一下它的代码

    private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
        	// 从requestQueue中取出一个AllocateRequest
            req = this.requestQueue.take();
            // 再从requestTable取出对应的AllocateRequest
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
            if (null == expectedRequest) {
                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize());
                return true;
            }
            // 从requestQueue和requestTable取出来的AllocateRequest应该是一样的,前面说到了,放进去的时候就是放一样的
            if (expectedRequest != req) {
                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                return true;
            }

            if (req.getMappedFile() == null) {
                long beginTime = System.currentTimeMillis();

                MappedFile mappedFile;
                // 这个默认是false
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    try {
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {
                        log.warn("Use default implementation.");
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                	// 所以是在这里创建了MappedFile,直接new了一个
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }

                long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
                if (elapsedTime > 10) {
                    int queueSize = this.requestQueue.size();
                    log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
                        + " " + req.getFilePath() + " " + req.getFileSize());
                }

                // 文件预热,下面会讲
                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                    .getMappedFileSizeCommitLog()
                    &&
                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                }

				// 把创建好的MappedFile在放到对应的AllocateRequest里面
                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
            this.hasException = true;
            return false;
        } catch (IOException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.hasException = true;
            if (null != req) {
                requestQueue.offer(req);
                try {
                    Thread.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        } finally {
            if (req != null && isSuccess)
            	// 通知等待创建的线程,以及创建好MappedFile了
                req.getCountDownLatch().countDown();
        }
        return true;
    }

看一下是怎么直接new一个MappedFile

    public MappedFile(final String fileName, final int fileSize) throws IOException {
        init(fileName, fileSize);
    }
    private void init(final String fileName, final int fileSize) throws IOException {
    	// 文件名
        this.fileName = fileName;
        // 大小
        this.fileSize = fileSize;
        this.file = new File(fileName);
        // fileFromOffset就是文件名去掉左边的0,前面说过了
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;

        ensureDirOK(this.file.getParent());

        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            // 原来就是用fileChannel.map()创建了一个MappedByteBuffer
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("Failed to create file " + this.fileName, e);
            throw e;
        } catch (IOException e) {
            log.error("Failed to map file " + this.fileName, e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }

看过上一篇文章(Java传统IO和NIO有什么区别)以后,对这个创建MappedByteBuffer应该不陌生了。所以这个创建过程很简单,MappedFile中有一个成员变量mappedByteBuffer,它的类型就是MappedByteBuffer,所以就是通过FileChannel.map()创建了一个MappedByteBuffer并保存在了MappedFile的成员变量mappedByteBuffer里面,以及一些其他成员变量的赋值。这样一个MappedFile就创建好啦。

刚才mmapOperation()中还提到了一个文件预热,这里简单讲一下。在创建了一个MappedByteBuffer以后,物理内存中真的就有一块空间了和文件对应了吗?还是在上一篇文章中(Java传统IO和NIO有什么区别)说到过,其实是没有,仅仅是做了一个映射,创建了一些数据结构而已,那当我们真正访问这块虚拟地址的时候,会发生缺页中断,然后才会把磁盘的数据读到内存。而文件预热干的事情,就是提前访问这块虚拟空间,把缺的页提前调进内存,这样当RocketMQ真的往里写数据的时候,就不会缺页中断了。提前访问的方式就是往每一页都写入一个0。

这样会不会有问题?我提前把页换进来了,但是操作系统此时运行着很多很多的进程,我们知道内存空间不够用了就会产生换页,万一我们刚刚提前换进来的页又被换出去了怎么办,那刚才做的文件预热不是白干了吗?RocketMQ中还有一段代码,会把文件预热过程中换进来的页都锁定在内存里,也就是不会被换出去。具体代码大家感兴趣可以自己去看看。

大家还记得我们讲到主流程的哪一步了吗?

主流程第二步

  1. 从MappedFileQueue中获取一个MappedFile
  2. 第一步获取的MappedFile为空或者满了,就要创建一个新的MappedFile,否则不需要第二步
  3. 调用MappedFile.appendMessage()方法将数据写入MappedFile

才刚讲完第二步,创建一个新的MappedFile的过程,这部分有点多,大家多看看就会理解了。

最后:

3.3 主流程第三步

主流程第三步

  1. 从MappedFileQueue中获取一个MappedFile
  2. 第一步获取的MappedFile为空或者满了,就要创建一个新的MappedFile,否则不需要第二步
  3. 调用MappedFile.appendMessage()方法将数据写入MappedFile

终于要写入数据了。主流程代码在太上面了,提醒大家一下是在ComitLog.public PutMessageResult putMessage(final MessageExtBrokerInner msg)方法中。第三步即:

result = mappedFile.appendMessage(msg, this.appendMessageCallback);

调用的是MappedFile.appendMessage()方法

    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
        return appendMessagesInner(msg, cb);
    }
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

		// 当前的写入位置
        int currentPos = this.wrotePosition.get();

		// 写入位置小于文件大小,说明还可以往里写
        if (currentPos < this.fileSize) {
        	// 这个writeBuffer和前面提到的transientStorePool有关系
        	// 只有开启了transientStorePool这个writeBuffer才不是空
        	// 前面提到默认是没有开启的,因此这个byteBuffer就是this.mappedByteBuffer.slice()
        	// mappedByteBuffer.slice()会创建一个新的byte buffer,新旧buffer的在内容上是共享的,因此在新的buffer上做的修改也会反应的旧的buffer
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            // 把位置调整的当前写入位置
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            // 走这里
            if (messageExt instanceof MessageExtBrokerInner) {
            	// 开始写入buffer,第一个参数就是文件名去掉左边的0,表示在整个CommitLog中的偏移量
            	// 第三个参数fileSize - currentPos不就是剩余能写入的长度吗
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            // 修改写位置
            this.wrotePosition.addAndGet(result.getWroteBytes());
            // 修改时间戳
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

这个AppendMessageCallback是从CommitLog里传进来的, AppendMessageCallback是一个接口,CommitLog中有一个内部类DefaultAppendMessageCallback实现了这个接口,CommitLog在创建的时候就会创建一个DefaultAppendMessageCallback的实例。

接下来看看这个DefaultAppendMessageCallback.doAppend()方法,方法挺长,但是并不复杂,就是将msgInner里面消息数据都写到前面得到的MappedByteBuffer

// 第二个参数,我们上面传进来的是一个MappedByteBuffer
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner) {
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

            // PHY OFFSET
            // 在CommitLog中的偏移量,就是文件名+相对偏移量
            long wroteOffset = fileFromOffset + byteBuffer.position();

            int sysflag = msgInner.getSysFlag();

            int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

            this.resetByteBuffer(storeHostHolder, storeHostLength);
            // 计算msgID,前四个字节是host的ip地址,5-8字节是port,最后8个字节是wroteOffset
            String msgId;
            if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
                msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            } else {
                msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            }

            // Record ConsumeQueue information
            keyBuilder.setLength(0);
            keyBuilder.append(msgInner.getTopic());
            keyBuilder.append('-');
            keyBuilder.append(msgInner.getQueueId());
            String key = keyBuilder.toString();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

            // Transaction messages that require special handling
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
            switch (tranType) {
                // Prepared and Rollback message is not consumed, will not enter the
                // consumer queuec
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    queueOffset = 0L;
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                default:
                    break;
            }

            /**
             * Serialize message
             */
            final byte[] propertiesData =
                msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

            if (propertiesLength > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
            }

            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            final int topicLength = topicData.length;
            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

			// 计算消息的长度,也就是要写入的长度
            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

            // Exceeds the maximum message
            // 消息长度超过最大阈值
            if (msgLen > this.maxMessageSize) {
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                    + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }

            // Determines whether there is sufficient free space
            // 是否有足够的可用空间
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {//没有足够空间
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

            // Initialization of storage space
            // 消息最终存储在CommitLog中的格式
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
            // 写到byteBuffer里
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    break;
                default:
                    break;
            }
            return result;
        }

到这里,消息就已经写到了内存的缓冲区中了。这篇文章也就到这里了。

四、 其他问题

关于上面提到的transientStorePool,之前讲的都是不开启的情况,那开启以后会怎么样呢?以及消息现在还是在内存的缓冲区中,怎么真正刷到磁盘呢?这个下篇文章再说。


版权声明:本文为AllENNNNNNN_原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。