rocketmq 之namesrv(三十六)mqclient admin请求处理直接消费消息

直接消费消息AdminBrokerProcessor#consumeMessageDirectly

AdminBrokerProcessor#processRequest#this.consumeMessageDirectly(ctx, request)

 private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request
            .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);

        /**从brokerConfig中获取brokerName设置到request的extFields中*/
        request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());
        SelectMappedBufferResult selectMappedBufferResult = null;
        try {
            MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());
            /**根据offset从commitLog中获取消息*/
            selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset());

            byte[] body = new byte[selectMappedBufferResult.getSize()];
            selectMappedBufferResult.getByteBuffer().get(body);
            request.setBody(body);
        } catch (UnknownHostException e) {
        } finally {
            if (selectMappedBufferResult != null) {
                selectMappedBufferResult.release();
            }
        }
        /**返回消费者运行信息*/
        return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(),
            requestHeader.getClientId());
    }
DefaultMessageStore#selectOneMessageByOffset
@Override
    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
        /**SelectMappedBufferResult中存储的是消息offset、创建时间*/
        SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
        if (null != sbr) {
            try {
                // 1 TOTALSIZE
                int size = sbr.getByteBuffer().getInt();
                return this.commitLog.getMessage(commitLogOffset, size);
            } finally {
                sbr.release();
            }
        }

        return null;
    }

版权声明:本文为qq_26400953原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。