掘金 后端 ( ) • 2024-03-28 15:37

theme: hydrogen

在上一篇文章中,简单的描述了RocketMQ5定时消息实现的大体框架。本期,我将深入到源码,从源码出发,看在RocketMQ5中是怎样实现定时消息的准时触发的。

切换Topic

所有的消息在发送到Broker后存储commitLog前,会有三个Hook对所有的消息进行前置处理。
RocketMQ Hook.jpg
其中,在handleScheduleMessageHook中会对定时消息进行处理。其处理逻辑为以下几点:

  1. 判断是否为事务消息,不是才继续执行
  2. 是否已经修改过Topic为定时消息的Topic。
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
    final MessageExtBrokerInner msg) {
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 不是事务消息或者是提交消息
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // TOPIC已经修改为TIMER_TOPIC的消息,一般只有消息在时间轮中循环才会跳过
        if (!isRolledTimerMessage(msg)) {
            // 有任意一个定时时间不会空,就需要转换
            if (checkIfTimerMessage(msg)) {
                if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
                    //wheel timer is not enabled, reject the message
                    return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
                }
                PutMessageResult transformRes = transformTimerMessage(brokerController, msg);
                if (null != transformRes) {
                    return transformRes;
                }
            }
        }
        // 18个级别的延时消息的逻辑
        if (msg.getDelayTimeLevel() > 0) {
            transformDelayLevelMessage(brokerController, msg);
        }
    }
    return null;
}
  1. 检查消息中是否带有定时标识:TIMER_DELIVER_MS、TIMER_DELAY_SEC(内部测试使用)、TIMER_DELAY_MS(内部测试使用)
  2. 如果为定时消息,则修改消息的Topic为TIMER_TOPIC,以及在消息中保留消息原本的Topic以及queueId。

output.png

定时消息处理-TimeMessageStore

TimeMessageStore中,有着多个Service用于处理定时消息在RocketMQ内部的流转。
定时消息全流程图
正是这五个Service,完成了整个定时消息的逻辑处理。

读取定时消息-TimerEnqueueGetService

先从commitLog消费消息的是TimerEnqueueGetService,该处理器负责从主题为TIMER_TOPIC中读取定时消息,并将消息放入enqueuePutQueue中。
TimerEnqueueGetService中,每隔100ms从TIMER_TOPICconsumerQueue中获取消息在commitLog中的位置信息

  1. 每隔100ms从TIMER_TOPIC的consumerQueue中获取消息在commitLog中的位置信息
  2. 根据获取的commitLog偏移量以及大小,从commitLog中获取消息内容。
  3. 将消息在commitLog的偏移量、大小、定时时间、当前时间以及消息本体封装成一个对象TimerRequest,放入enqueuePutQueue中。

TimerEnqueueGetService处理逻辑

持久化定时消息到TimerLog-TimerEnqueuePutService

此时,需要处理的定时消息存放在enqueuePutQueue中,TimerEnqueuePutService会从其中获取封装好的TimerRequest对象,批量获取后统一处理。

  1. 先判断对象的定时时间是否已经过去了,过去了就马上放到dequeuePutQueue队列中。

output.png

  1. 根据对象的定时时间减去当前写入时间轮的时间要是大于重试的时间,则将该消息对应的TimerRequest魔法值标识为ROLL。

计算公式:

delayedTime - tmpWriteTimeMs >= (long) timerRollWindowSlots * precisionMs;

delayedTime 消息真实的定时时间 tmpWriteTimeMs 当前写入时间轮的时间 timerRollWindowSlots 时间轮中需要循环的槽数 固定值:2天

在计算完ROLL的魔法值后,还会看消息是否携带TIMER_DEL_UNIQKEY属性来设置该消息用于删除其他定时消息。 注:删除其他定时消息的逻辑尚未实装。

output.png

  1. TimerRequest对象中获取以下关键信息持久化到TimerLog中。

    名称 说明 占用字节大小 内容 size TimerLog对象大小 4 固定值:52 prev pos 上一个TimerLog的指针 8 magic 魔法值 4 标识消息的处理逻辑 currWriteTime 写进timerWheel的时间 8 delayTime 在循环中的定时时间 4 offset 在commitLog中的位置 8 size commitLog中的大小 4 hashcode of real topic 真实topic的哈希值 4 reserved value 保留值 8
  2. 根据当前写入时间轮时间获取时间轮对应的槽后,将上面第3步持久化成功后TimerLog得到的位置偏移量更新TimerWheel中,维持时间轮中槽的最后一条timerLog消息位置。

时间轮槽结构:

延时时间 timerLog首条位置 timerLog最后位置 消息条数 魔法数
  1. 等待该批次的TimerRequest都处理完成后,才进行下一次的循环。

TimerEnqueuePutService处理逻辑

TimerEnqueuePutService会使用CountDownLatch等待每批次的消息都处理完成(保存或者入队到dequeuePutQueue中)才进行下一批次的处理。

根据定时时间从TimerLog读取定时消息-TimerDequeueGetService

TimerMessageStore中,TimerDequeueGetService周期性的从TimerWheel以及TimerLog中获取需要处理的消息。

  1. 根据时间轮当前读时间获取对应槽后,将槽中所有该时间点的定时消息属性从TimerLog中读取出来。
  2. 将定时消息属性再次封装成TimerRequest对象,根据魔法值决定放入哪条列表中。
    1. deleteMsgStack:TimerRequest所对应的消息中携带了TIMER_DEL_UNIQKEY属性
    2. normalMsgStack:普通的消息
  3. 将列表中TimerRequest对象根据消息所在commitLog文件进行分类。
  4. 分类后列表放进dequeueGetQueue中。

TimerDequeueGetService处理逻辑

TimerDequeueGetService在处理每一批次的TimerRequest对象时,类似TimerEnqueuePutService一样,也使用了CountDownLatch来等待每批次的消息处理完成。

读取原本消息内容-TimerDequeueGetMessageService

  1. dequeueGetQueue中获取消息同在一个commitLog的TimerRequest对象列表。
  2. 从TimerRequest对象中获取消息在commitLog的位置以及大小后,获取commitLog中保存真实的消息。
  3. 根据魔法值判断本条消息的类型,如果是删除类型的,则将要删除消息的id放进Set中。

dequeueGetQueue取出的TimerRequest中,都共同持有同一个Set的引用。在这一步,正是将id放进共同引用的Set中。

  1. 判断消息id在不在Set中,在则不处理该消息。

其中,第3、4步在当前的版本中,仅用于单元测试,并不涉及功能。

  1. 将真实的消息保存在TimerRequest后,将TimerRequest放进dequeuePutQueue中。

投递消息-TimerDequeuePutMessageService

  1. dequeuePutQueue队列获取TimerRequest中的消息进行处理。
  2. 根据魔法值来决定是否需要还原为真实的Topic。

在这一步中,就是整个定时消息循环处理的最后一步了。要是魔法值为ROLL的话,标明该消息尚未到达真实定时时间,只是到达了该消息在本次循环中的时间。

TimerRequest的魔法值作用

初始化位置:在TimerEnqueuePutService中,根据消息定时时间以及消息中是否携带删除标识(TIMER_DEL_UNIQKEY)决定

  • DEFAULT:无作用
  • ROLL:表明本条定时消息需要在TimerMessageStore的处理中循环处理,直到真正达到消息的定时时刻。

该魔法值也是为了解决消息在commitLog的存储时间问题,要是消息的定时时间过长,将会导致消息超过消费时间,从而被RockerMQ清理导致消息丢失。消息每次在被TimerMessageStore处理时,都会从commitLog取出并消费,此时就会更新销毁时间,那么消息丢失的问题便解决了。

  • DELETE:无实际意义,目前仅用于单元测试。个人猜测在未来的版本中将提供删除未消费定时消息的功能。

总结

RocketMQ5中的定时消息使用起来虽然简单,但是在消息循环处理的过程中,涉及到了多个处理器对其进行处理。而且上面每一个Service都是一个单独的线程来进行处理,其中TimerDequeueGetMessageService以及TimerDequeuePutMessageService更是开了三个线程来同时处理。
本文引用:社区在讨论什么?《Support Timing Messages with Arbitrary Time Delay》

下期预告:定时消息宕机时保存以及定时刷盘机制