掘金 后端 ( ) • 2024-04-08 13:53

废话不多说,上延迟队列实现实现代码

implementation group: 'org.redisson', name: 'redisson-spring-boot-starter', version: '3.25.0'
@Test
fun contextLoads() {
  val blockingQueue = redisson.getBlockingDeque<String>("TEST-QUEUE")
  val delayQueue = redisson.getDelayedQueue(blockingQueue)
  Thread {
    while (true) {
      try {
        // 必须使用 blockingQueue 的 poll() 方法!
        val d = blockingQueue.poll()
        if (d != null) {
          println(d)
        }
      } catch (e: InterruptedException) {
        e.printStackTrace()
      }
    }
  }.start()

  repeat(5) {
    // 必须使用 delayQueue 的 offer() 方法。为什么有这么个规定?请看下文。
    delayQueue.offer("TEST-DATA$it", 5, TimeUnit.SECONDS)
    Thread.sleep(1000)
  }
  Thread.sleep(10000)
}

可以看到控制台在第五秒时,每隔一秒输出一行代码

以上实现核心有两个:poll()和offer()

先看offer()方法实现:

层层点进去后:

image-20240407222307423

核心在最后一行这个lua命令,格式化后,再举例替换掉参数位:

-- 示例参数
local timeout = 1645833600 -- 2022年2月22日00:00:00(Unix 时间戳),到期时间戳
local random = 'abc123'
local encodedMessage = '{"id":1,"content":"Hello World"}' -- 假设这是编码后的消息内容

-- 构造消息值
local value = struct.pack('Bc0Lc0', string.len(random), random, string.len(encodedMessage), encodedMessage)

-- 添加到超时有序集合
redis.call('zadd', 'myTimeoutSetName', timeout, value)

-- 添加到普通队列
redis.call('rpush', 'myQueueName', value)

-- 发布通知
local v = redis.call('zrange', 'myTimeoutSetName', 0, 0)
if v[1] == value then
    redis.call('publish', 'myChannelName', timeout)
end

综合来看,这段代码实现了以下功能:

  • 创建一个包含到期时间戳、随机标识符和编码消息内容的复合二进制值。
  • 将该值同时存入一个有序集合(用于按到期时间排序和查询)和一个普通队列(用于常规消息处理)。
  • 当处理的消息恰好是有序集合中最接近到期的一个时,通过 Redis 的发布/订阅机制,在 myChannelName 频道发布一条消息,通知其他订阅者有消息即将到期。

pub/sub在这里是干什么的?看一幅图

iShot2024-04-0723.34.15

上面箭头对应了代码创建的队列,pub/sub 机制是为了做队列数据转移工作的,即从前两个队列中移除过期元素到第三个队列中

@Autowired
private lateinit var redisson: RedissonClient

@Test
fun contextLoads() {
  val blockingQueue = redisson.getBlockingDeque<String>("TEST-QUEUE")
  val delayQueue = redisson.getDelayedQueue(blockingQueue)

  // part1
  Thread {
    while (true) {
      try {
        // 必须使用 blockingQueue 的 poll() 方法!
        val d = blockingQueue.poll()
        if (d != null) {
          println(d)
        }
      } catch (e: InterruptedException) {
        e.printStackTrace()
      }
    }
  }.start()

  // part2
  repeat(5) {
    // 必须使用 delayQueue 的 offer() 方法
    
    // 每五秒发送一个延时消息
    delayQueue.offer("TEST-DATA$it", 10, TimeUnit.SECONDS)
    Thread.sleep(5000)
  }


  //        blockingQueue.push("A")
  Thread.sleep(1000000)
}

为了方便你自己调试,我把代码贴在这,试试part1、2分别打开和关闭的组合多试几次,然后手动频繁刷新查看redis存储情况,就非常清楚了。

那转移工作怎么做的?

首先贴上关键代码位置

image-20240407234343229

同样在这个类,其构造方法中,起了一个转移任务。

这样,便每次创建一个delay_queue,都会起一个转移任务,该任务负责

  • 异步将超时有序集合中的到期消息转移到普通队列,并从有序集合中移除。
  • 获取下一次任务的开始时间。
  • 返回一个 RTopic 实例,用于发布/订阅消息到期通知。

该任务怎么调度的?

源码位置在 queueTransferService.schedule(queueName, task)

image-20240408000340389

image-20240408000359363