rocketmq笔记7-延迟消息

概述

只支持固定级别的延迟消息,不支持任意时间精度(出于性能和复杂度考虑)

实现原理:写入时转发到内置主题:SCHEDULE_TOPIC_XXXX,然后ScheduleMessageService中到延迟任务会延迟对应时间,然后从该主题消费队列中将消息拉取出来,获取消息的原主题与原消息消费队列重新写入到commitlog,此时原消费者可以消费到。

从实现原理可以看出,延迟消息的延迟时间指的是延迟投递时间,如果消费者有积压,则实际消费时间和延迟时间不一致。

实现类

ScheduleMessageService extends ConfigManager

核心属性

//内置topic
public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
//第一次调度时延迟的时间
private static final long FIRST_DELAY_TIME = 1000L;
//每一延时级别调度一次后延迟该时间间隔后再放入调度池
private static final long DELAY_FOR_A_WHILE = 100L;
//发送异常后延迟该时间后再继续参与调度
private static final long DELAY_FOR_A_PERIOD = 10000L;

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
    new ConcurrentHashMap<Integer, Long>(32);

private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
    new ConcurrentHashMap<Integer, Long>(32);
//默认消息存储器
private final DefaultMessageStore defaultMessageStore;
//MessageStoreConfig#messageDelayLevel中最大消息延迟级别
private int maxDelayLevel;

工作流程

load

构建offsetTable

解析${ROCKET_HOME}/store/config/delayOffset.json成offsetTable

构建delayLevelTable

解析messageDelayLeveldelayLevelTable

//MessageStoreConfig
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

start

  1. 遍历delayLevelTable,对每个level执行this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);

  2. 开启定时持久化延迟消息消费offset任务:默认每10s一次,可修改

DeliverDelayedMessageTimerTask

下列流程是单个延迟任务的executeOnTimeup流程

定位ConsumeQueue

根据topic和level对应的queueId找到ConsumeQueue

从ConsumeQueue解析消息

ConsumeQueue中每条消息为一个ConsumeQueueExt.CqExtUnit,长度为20字节,结构如下:

8字节4字节8字节
offsetsizetagsCode

这里有个特殊的地方,tagsCode这个值一般是记录消息Tag的哈希值,用于消息过滤;而延迟消息这个tagsCode的值是deliverTimestamp,也就是实际应该投递的时间戳。 这个值是commitlog刷盘时就计算好的。

//CommitLog#checkMessageAndReturnSize
if (delayLevel > 0) {
    tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);
}

检查当前时间是否已经超过投递时间

如果没超过

新建延迟任务,延迟(投递时间-当前时间)

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;

如果超过则投递到原来的消费队列中

  1. 根据offset和size从commitlog里获取MessageExtScheduleMessageService.this.defaultMessageStore#lookMessageByOffset(offset,size)
  2. MessageExt转成MessageExtBrokerInner,其中包括tagCode的重新计算等:MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
  3. 投递:ScheduleMessageService.this.writeMessageStore.putMessage(msgInner)

至此延迟任务结束

延迟消息流程

在这里插入图片描述

写入

如果是延迟消息,将topic换成延迟消息内置topic,queueId换成延迟等级对应的queueId,真实的topic和queueId则放到消息properties里面。

CommitLog#asyncPutMessage

if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }

    topic = ScheduleMessageService.SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    // Backup real topic, queueId
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

版权声明:本文为torisang原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。