掘金 后端 ( ) • 2024-06-28 15:51

在当今数字化时代,众多应用场景对精确的定时任务处理能力有着迫切需求,像电商平台的限时促销、社交网络的定时消息推送、金融系统的定时结算等等。

实际应用场景丰富多样,如何构建一个通用、功能完备且能满足大部分场景需求的系统,是我们需要思考的问题。

以预约场景为例进行设计,假设预约量巨大、不稳定,且存储时间长(如几个月甚至更久之后才执行),从这几个问题出发,设计出的系统往往才具有普遍适用性。

我们可以简单预想到,设计的功能必须囊括下面几个点

  1. 按需设置定时任务。
  2. 能够修改未到执行时间的任务属性(包括执行时间)。
  3. 可以查询任务情况,并能人工干预任务的执行。

此外,还有更多基于技术层面的问题需要考量:

分类 问题 性能 面对海量延时任务,如何确保某个任务能在指定时间内执行,避免过度延迟? 可用性 保证任务不重不漏地执行,杜绝丢任务、重复执行的情况确保服务尽量不宕机,能够抵御突发的大量请求当系统宕机或者任务失败/丢失等情况发生时,可以进行恢复 安全 如果延时任务在执行过程中涉及到对数据的修改,而在这段延迟期间,相关数据又被其他操作修改,可能导致数据不一致的情况

暂且抛开这些问题,我们先不追求实现所有目标,而是提出一些业内常见的实现方案,以便与最终方案进行对比。

无所不能的数据库?

在系统规模较小时,可以通过一个服务/进程定时扫描数据库,依据执行时间字段判断是否到达执行时间,从而实施定时操作。

事实上,确实有许多定时系统如此实现,因其足够简便,且支持集群操作。

缺点也足够明显

  1. 对服务器内存消耗大
  2. 存在延迟,执行时间粒度和mysql本身的速度都会影响
  3. 假设已经存在几千万条数据,即使每隔几分钟这样扫描一次,数据库损耗都极大

语言自带的延时库

几乎所有的编程语言,官方库都会有延时队列的实现,Go的delayqueue,Java的DelayQueue

直接使用完全可行,实际上,若无特殊需求,应尽量使用语言自带的库,避免过度设计。

从这一角度看,DelayQueue 无疑是一种简单且出色的实现。但这些库普遍存在一个通病,即基本是利用语言特性结合内存来实现,对于大型项目或追求高可用性的系统而言,并非适用的方案。

开源MQ

先上几个主流队列的对比

特性 RabbitMQ RocketMQ Kafka ActiveMQ 生产者-消费者模式 支持 支持 支持 支持 订阅-发布模式 支持 支持 支持 支持 重放 支持 支持 - 支持 支持多语言 语言无关 支持 支持 支持,Java优先 单机吞吐量 万级 万级 十万级 万级 消息延迟 微妙 只支持特定的延迟级别,是固定的预设值 毫秒 毫秒 可用性 高(支持主从) 高 高(分布式) 高(支持主从) 消息丢失 低(取决配置) - 理论上不会丢失(取决配置) - 消息重复 可配置 - 理论上会有重复 - 文档/社区 高 中 高 高

具体说明下比较常用的RabbitMQ的实现:

RabbitMQ

RabbitMQ 本身并不直接提供对延迟队列的支持,需要依靠 RabbitMQ 的TTL以及死信队列功能,来实现延迟队列的效果。

那就让我们首先来了解一下,RabbitMQ 的死信队列以及 TTL 功能。

死信队列

死信队列(Dead Letter Queue,简称 DLQ)是消息队列中的一个特殊队列。
当消息在正常的队列中无法被成功处理(例如被拒绝、超时未消费、达到最大重试次数、消息队列到达最大长度等),这些无法处理的消息就可能被转移到死信队列中。

消息一旦变成一条死信,便会被重新投递到死信交换机(Dead-Letter-Exchange),然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。

image-2024-6-28_14-6-22.png

单单依赖上面的队列还不足以满足要求,因为实际使用,队列的数据往往是需要无序的,比如我之前插入一个月后触发的任务,后面插入一个一个星期之后触发的任务。

但是RabbitMQ 在检查消息是否过期时,只会检查第一个消息是否过期,并不会校验后面消息过期的情况比如第一个消息设置了 20s 的 TTL,第二个消息设置了 10s 的 TTL,那么 RabbitMQ 会等到第一个消息过期之后,才会让第二个消息过期。

所以,我们还需要一个插件,贴上地址 :https://www.rabbitmq.com/community-plugins

(具体插件的实现可以自行去了解下,简单来说就是,使用 DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存。这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =<ERL_MAX_T,如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了)

使用 RabbitMQ 来实现延迟队列,具有很明显的一些优势:

比如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃

另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者消息丢失。

关于MQ

之所以在MQ里面只列举了Rabbit MQ的实现,当然还有其他MQ可以实现,有兴趣还可以去了解下Beanstalkd,诞生的立意就是为了解决队列,包括延时队列的。

(Beanstalkd是一个开源的高性能分布式内存队列系统,设计用于处理大量并发任务。它提供了一个简单而强大的API,使得开发者能够轻松地将任务放入队列、获取队列状态以及处理队列中的任务。Beanstalkd具有高可用性、可扩展性和灵活性,适用于各种规模的应用程序,也不乏有大公司在使用,比如Facebook,国内的go-zero也是基于此开发的队列)

其实当你考虑要使用用众多MQ来实现延迟队列时,你就得清晰知道一件事,你是在逼着MQ做不擅长的事情,MQ的定位就是消息队列,而不是替你保存动辄一个月才执行的消息然后精准执行。

先不去考虑问题有哪些,先看本质

延时队列相比于普通队列最大的区别就体现在其延迟的属性上。

普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。

从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。

Redis zset

同样是业务被众多采用的实现方案,通过 Redis Zset 实现延迟队列是一种理解起来较为直观,可以快速落地的方案。

并且我们可以依赖 Redis 自身的持久化来实现持久化,使用 Redis 集群来支持高并发和高可用,是一种不错的延迟队列的实现方案。

简单的实现图如下:

image-2024-6-28_14-31-5.png

入队操作:我们将需要处理的任务,按其需要延迟处理时间作为 Score 加入到 ZSet 中(Redis 的 ZAdd 的时间复杂度是O(logN)N是ZSet 中元素个数,写入效率还是可以的)。

起一个进程定时(比如每隔一秒)查询 ZSet 中 Score 最小的元素,查询出的分数小于等于当前时间戳,说明到这个任务需要执行的时间了,则去异步处理该任务;

不过对于redis来说同样有缺点:

  1. REDIS虽然可以持久化,但不是百分百可靠:REDIS最高级的持久化配置就是每次操作都记录,但是由于性能问题,基本不可能这样配(可以考虑数据持久化到mysql)
  2. 同样对已有的架构引入了新的组件,但是REDIS足够简单

对于引入REDIS,其实兼容缺点对于开发者来说,是相对简单的

  1. 担心消息会丢,那我们引入mysql 存储,通过mysql锁来解决重复消费/处理
  2. 担心消息互相影响,再引入MQ,比如Kafka解耦,只做执行任务的操作,对于这个层级来说,就没有延迟任务的概念了

简单的流程处理图如下,仅供参考:

企业微信截图_41c7e0c6-9a20-4a27-857d-f99e30c6a65c.png

到底如何设计?

其实各种实现整合来看

  1. 无论是基于死信队列还是直接使用语言内存,抑或是数据先存储(mysql/redis)后投递(kafka),本质上都是将延迟待发送的消息数据与正常订阅的队列分开存储,这么做一方面降低耦合度,另一方面也是为了降低数据不可控的时间。
  2. 既然选择了数据分离,整条链路的存储组件和队列组件的选择,按需选择,十分重要,要从本身已有架构的现状出发,选择最适合本身的实现。
  3. 无论是检查队头消息TTL还是调度存储的数据,本质上都是通过定时任务来完成的,定时任务的触发策略也是决定你方案优劣的决定性因素:你是crontab配置,还是主备选举策略、还是大家一起抢分布式锁,也值得根据具体情况具体分析

如果感兴趣,可以再留意我们,我们会再更新关于定时任务(延时队列)的业务使用场景的分享,敬请期待~