掘金 后端 ( ) • 2024-04-22 16:59

Redisson实现延时队列

版本说明:

  • spring boot 2.6.0
  • redisson-spring-boot-starter 3.28.0

一、加入依赖&配置

 <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.28.0</version>
</dependency>

application.properties

spring.application.name=springboot-redis-delayed-queue-demo

spring.redis.database=2
spring.redis.host=localhost
spring.redis.password=123456
spring.redis.port=6379

二、延时任务添加

package cn.aohan.delayedqueue.provider;

import cn.aohan.delayedqueue.model.DelayedTaskInfo;
import cn.aohan.delayedqueue.model.TaskData;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

/**
 * @author 傲寒
 * @date 2024/4/19
 */
@Component
public class DelayedQueueProvider {

    private final RedissonClient redissonClient;

    public DelayedQueueProvider(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    /**
     * 添加延迟任务
     *
     * @param delayedName 延迟名称
     * @param val         值
     * @param delayTime   延迟时间
     * @param timeUnit    时间单位
     */
    public void addDelayedTask(String delayedName, TaskData val, long delayTime, TimeUnit timeUnit) {
        final DelayedTaskInfo task = new DelayedTaskInfo();
        task.setCreateAt(System.currentTimeMillis());
        task.setDelayTime(delayTime);
        task.setTimeUnit(timeUnit);
        task.setVal(val);
        task.setDelayedName(delayedName);
        final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(delayedName);
        delayedQueue.offer(task, delayTime, timeUnit);
    }

    /**
     * 删除任务
     *
     * @param queueName 队列名称
     * @param taskId    任务id
     */
    public void removeTask(String queueName, String taskId) {
        final RBlockingDeque<DelayedTaskInfo> blockingDeque = getBlockingDeque(queueName);
        final Predicate<DelayedTaskInfo> predicate = item -> {
            final TaskData val = item.getVal();
            return Objects.nonNull(val) && Objects.equals(taskId, val.taskId);
        };
        blockingDeque.removeIf(predicate);
        final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(getBlockingDeque(queueName));
        delayedQueue.removeIf(predicate);
    }


    /**
     * 获取阻塞deque
     *
     * @param queueName 队列名称
     * @return {@link RBlockingDeque}<{@link DelayedTaskInfo}>
     */
    public RBlockingDeque<DelayedTaskInfo> getBlockingDeque(String queueName) {
        return redissonClient.getBlockingDeque(queueName, JsonJacksonCodec.INSTANCE);
    }

    /**
     * 获取延迟队列
     *
     * @param queueName 队列名称
     * @return {@link RDelayedQueue}<{@link DelayedTaskInfo}>
     */
    private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(String queueName) {
        return redissonClient.getDelayedQueue(getBlockingDeque(queueName));
    }

    /**
     * 获取延迟队列
     *
     * @param blockingDeque 阻塞deque
     * @return {@link RDelayedQueue}<{@link DelayedTaskInfo}>
     */
    private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(RBlockingDeque<DelayedTaskInfo> blockingDeque) {
        return redissonClient.getDelayedQueue(blockingDeque);
    }

}

三、监听延时任务到期

延时队列名称常量

/**
 * @author 傲寒
 * @date 2024/4/19
 */
public class QueueConstant {

    /**
     * 测试延迟任务队列 name
     */
    public static final String TEST_DELAYED_TASK_QUEUE = "test_delayed_task_queue";


}

listener监听

package cn.aohan.delayedqueue.listener;

import cn.aohan.delayedqueue.constant.QueueConstant;
import cn.aohan.delayedqueue.model.DelayedTaskInfo;
import cn.aohan.delayedqueue.provider.DelayedQueueProvider;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 延迟任务监听器
 *
 * @author 傲寒
 * @date 2024/4/19
 */
@RequiredArgsConstructor
@Slf4j
@Component
public class DelayedTaskListener implements ApplicationRunner {

    private final DelayedQueueProvider delayedQueueProvider;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        delayedTaskHandle(QueueConstant.TEST_DELAYED_TASK_QUEUE);
    }

    public void delayedTaskHandle(String delayedQueueName) {
        final Thread thread = new Thread(() -> {
            final RBlockingDeque<DelayedTaskInfo> blockingDeque = delayedQueueProvider.getBlockingDeque(delayedQueueName);
            while (true) {
                try {
                    //将到期的数据取出来,等待超时
                    final DelayedTaskInfo delayedTaskInfo = blockingDeque.poll(2, TimeUnit.MINUTES);
                    if (Objects.isNull(delayedTaskInfo)) {
                        continue;
                    }
                    log.info("DelayedTask task :[{}]", delayedTaskInfo);
                } catch (Exception e) {
                    log.error("DelayedTaskListener#delayedTaskHandle error delayedQueueName:[{}]", delayedQueueName, e);
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }
}

四、测试延时任务添加

package cn.aohan.delayedqueue.controller;

import cn.aohan.common.dto.Result;
import cn.aohan.delayedqueue.constant.QueueConstant;
import cn.aohan.delayedqueue.model.dto.TestDelayedDTO;
import cn.aohan.delayedqueue.provider.DelayedQueueProvider;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 傲寒
 * @date 2024/4/19
 */
@AllArgsConstructor
@RestController
@RequestMapping("/api/test/delayed")
public class DelayedQueueTestController {

    private final DelayedQueueProvider delayedQueueProvider;

    /**
     * 添加延迟任务
     *
     * @param delayedTask 延迟任务
     * @return {@link Result}<{@link Void}>
     */
    @PostMapping
    public Result<Void> addDelayedTask(@RequestBody TestDelayedDTO delayedTask) {
        delayedQueueProvider.addDelayedTask(
                QueueConstant.TEST_DELAYED_TASK_QUEUE,
                delayedTask.getVal(),
                delayedTask.getDelayTime(),
                delayedTask.getTimeUnit()
        );
        return Result.success();
    }

}

五、大致流程及其原理

在一开始创建延时队列的时候会创建一个QueueTransferTask

org.redisson.RedissonDelayedQueue#RedissonDelayedQueue

image-20240422162041298.png

image-20240422162102279.png channelName = prefixName("redisson_delay_queue_channel", getRawName());

  • 这里并订阅(channel)延时队列创建任务调度(主要是使用netty中时间轮)。
  • 使用pushTaskAsync去操作lua脚本移除redis 中LIST和ZSET的元素。

根据延迟时间插入到对中合适的位置,主要是

org.redisson.RedissonDelayedQueue#offerAsync 方法中的一段lua脚本

local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);
redis.call('zadd', KEYS[2], ARGV[1], value);
redis.call('rpush', KEYS[3], value);
local v = redis.call('zrange', KEYS[2], 0, 0); 
if v[1] == value then 
   redis.call('publish', KEYS[4], ARGV[1]); 
end;

总而言之,这段代码的功能是:

  • 将两个字符串打包成一个二进制值。
  • 将打包后的值添加到一个排序集合(zset)中,并为其指定分数(当前时间+延迟时间)。
  • 将同样的值添加到一个列表的尾部。
  • 如果添加的元素是排序集合中的第一个元素,则向发布一条消息(上边的订阅的channel)。

然后使用BLPOP阻塞的去获取LIST的元素

​ redisson实现延迟队列的原理,简单来说,将数据插入到延迟队列时,会存入到延迟队列的list和zset结构中,通过任务调度的方式将延迟队列中到期的数据取出,然后放入到阻塞队列中,客户端通过BLPOP的命令阻塞的拉取阻塞队列的数据,若拉取到数据就可以进行业务逻辑的处理。

项目源码