直接消费消息AdminBrokerProcessor#consumeMessageDirectly
AdminBrokerProcessor#processRequest#this.consumeMessageDirectly(ctx, request)
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
/**从brokerConfig中获取brokerName设置到request的extFields中*/
request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());
SelectMappedBufferResult selectMappedBufferResult = null;
try {
MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());
/**根据offset从commitLog中获取消息*/
selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset());
byte[] body = new byte[selectMappedBufferResult.getSize()];
selectMappedBufferResult.getByteBuffer().get(body);
request.setBody(body);
} catch (UnknownHostException e) {
} finally {
if (selectMappedBufferResult != null) {
selectMappedBufferResult.release();
}
}
/**返回消费者运行信息*/
return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(),
requestHeader.getClientId());
}
DefaultMessageStore#selectOneMessageByOffset
@Override
public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
/**SelectMappedBufferResult中存储的是消息offset、创建时间*/
SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
if (null != sbr) {
try {
// 1 TOTALSIZE
int size = sbr.getByteBuffer().getInt();
return this.commitLog.getMessage(commitLogOffset, size);
} finally {
sbr.release();
}
}
return null;
}
版权声明:本文为qq_26400953原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。