文章目录
概述
只支持固定级别的延迟消息,不支持任意时间精度(出于性能和复杂度考虑)
实现原理:写入时转发到内置主题:SCHEDULE_TOPIC_XXXX,然后ScheduleMessageService中到延迟任务会延迟对应时间,然后从该主题消费队列中将消息拉取出来,获取消息的原主题与原消息消费队列重新写入到commitlog,此时原消费者可以消费到。
从实现原理可以看出,延迟消息的延迟时间指的是延迟投递时间,如果消费者有积压,则实际消费时间和延迟时间不一致。
实现类
ScheduleMessageService extends ConfigManager
核心属性
//内置topic
public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
//第一次调度时延迟的时间
private static final long FIRST_DELAY_TIME = 1000L;
//每一延时级别调度一次后延迟该时间间隔后再放入调度池
private static final long DELAY_FOR_A_WHILE = 100L;
//发送异常后延迟该时间后再继续参与调度
private static final long DELAY_FOR_A_PERIOD = 10000L;
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
//默认消息存储器
private final DefaultMessageStore defaultMessageStore;
//MessageStoreConfig#messageDelayLevel中最大消息延迟级别
private int maxDelayLevel;
工作流程
load
构建offsetTable
解析${ROCKET_HOME}/store/config/delayOffset.json成offsetTable
构建delayLevelTable
解析messageDelayLevel成delayLevelTable
//MessageStoreConfig
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
start
遍历
delayLevelTable,对每个level执行this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);开启定时持久化延迟消息消费offset任务:默认每10s一次,可修改
DeliverDelayedMessageTimerTask
下列流程是单个延迟任务的executeOnTimeup流程
定位ConsumeQueue
根据topic和level对应的queueId找到ConsumeQueue
从ConsumeQueue解析消息
ConsumeQueue中每条消息为一个ConsumeQueueExt.CqExtUnit,长度为20字节,结构如下:
| 8字节 | 4字节 | 8字节 |
|---|---|---|
| offset | size | tagsCode |
这里有个特殊的地方,tagsCode这个值一般是记录消息Tag的哈希值,用于消息过滤;而延迟消息这个tagsCode的值是deliverTimestamp,也就是实际应该投递的时间戳。 这个值是commitlog刷盘时就计算好的。
//CommitLog#checkMessageAndReturnSize
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);
}
检查当前时间是否已经超过投递时间
如果没超过
新建延迟任务,延迟(投递时间-当前时间)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
如果超过则投递到原来的消费队列中
- 根据offset和size从commitlog里获取
MessageExt:ScheduleMessageService.this.defaultMessageStore#lookMessageByOffset(offset,size) - 将
MessageExt转成MessageExtBrokerInner,其中包括tagCode的重新计算等:MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); - 投递:
ScheduleMessageService.this.writeMessageStore.putMessage(msgInner)
至此延迟任务结束
延迟消息流程

写入
如果是延迟消息,将topic换成延迟消息内置topic,queueId换成延迟等级对应的queueId,真实的topic和queueId则放到消息properties里面。
CommitLog#asyncPutMessage
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}