掘金 后端 ( ) • 2024-06-18 10:30

开心一刻

今天坐在太阳下刷着手机
老妈走过来问我:这么好的天气,怎么没出去玩
我:我要是有钱,你都看不见我的影子
老妈:你就不知道带个碗,边要边玩?
我:......
后面试了下,边要边玩确实香!!!

狗吃惊.gif

优先级队列

说到队列,相信大家一定不陌生,是一种很基础的数据结构,它有一个很重要的特点:先进先出
但说到优先级队列,可能有些小伙伴还不清楚,因为接触的不多嘛,那我们就来盘盘它
示例基于: RabbitMQ 3.9.11

业务场景

我手头上正好有一个项目,系统之间通过 RabbitMQ 通信,调度系统 是消息生产者, 文件生成系统 是消息消费者。默认情况下,先下发的消息会先被消费,也就是先进队列的消息会先出队列,这个相信大家都能理解。但业务高峰期,重要程度不同的文件都需要生成,那如何保证重要文件先生成了?
方式有但不局限于以下几种

  1. 调整调度
    这个比较好理解,优先级高的文件的调度提前,或者优先级低的文件的调度延后
    1.1 将重要文件的调度提前,保证重要文件的消息先进入队列;但需要考虑调度能否提前,如果生成文件依赖的上游数据还未就绪了?
    1.2 将普通文件的调度延后,有点围魏救赵的感觉,万一某一天不需要生成重要文件,那服务器岂不是有一段时间的空置期,而这段空置期本可以生成普通文件
    总的来说就是不够灵活:有重要文件的时候先生成重要文件,没有重要文件的时候生成普通文件

不够灵活.gif

  1. 提升硬件资源
    这个就不用过多解释了吧,加大 文件生成系统 的硬件配置,提高其文件生成能力,保证文件(不论重要还是普通)都能在调度的时间开始生成,也就无需区分重要与普通了,那么重要文件先生成这个命题就不成立了。想想都美,可实际情况,大家都懂的!

控制成本.jpg

  1. 优先级队列
    RabbitMQPriority Queue 非常契合这个业务场景,详情请往下看

优先级队列

相较于普通队列,优先级队列肯定有一个标志来标明它是一个优先级队列,这个标志就是参数: x-max-priority ,定义优先级的最大值。我们先来看下 RabbitMQ 控制台如何配置

优先级控制台配置.png

相关参数配置好之后,点击 Add queue 即创建出了一个 优先级队列,创建完成之后,你会发现队列上有一个 Pri 标志,说明这是一个优先级队列

pri 标志.png

实际开发工程中,一般不会在 RabbitMQ 控制台创建队列,往往是服务启动的时候,通过服务自动创建 exchangequeue,实现也非常简单

@Configuration
public class RabbitConfig {

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(QSL_EXCHANGE, true, false);
    }

    @Bean
    public Queue queue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-max-priority", 5);
        return new Queue(QSL_QUEUE, true, false, false, args);
    }

    @Bean
    public Binding bindingQueue() {
        return BindingBuilder.bind(queue()).to(directExchange()).with("com.qsl");
    }
}

普通队列与优先级队列_代码创建对比.png

服务启动成功后,我们可以在 RabbitMQ 控制台看到队列: com.qsl.queue ,其 x-max-priority 等于 5

代码创建优先级队列_控制台查看.png

消息优先级

消息属性 priority 可以指定消息的优先级。停止服务后,我们手动往队列 com.qsl.queue 中放一些带有优先级的消息,优先级分别是: 3,1,5,5,10,4 对应的消息体分别是: 3,1,5_1,5_2,10,4

消息优先级_控制台配置.png

此时队列中共有 6 个消息准备就绪

6个优先级消息.png

启动服务,进行消息消费,消费顺序如下

6个优先级消息消费日志.png

可以总结出一个规律:优先级高的先出队列,优先级相同的,先进先出
那优先级是 10 的那个消息是什么情况,它为什么不是第一个出队?

优先级是10.jpg

因为队列 com.qsl.queue 的最大优先级是 5,即使消息的优先级设置成 10,其实际优先级也只有 5,这样是不是就理解了?
实际开发过程中,基本不会在 RabbitMQ 控制台手动发消息,肯定是由服务发送消息。我们模拟下带有优先级的消息发送

代码发送优先级消息.png

是不是 so easy !

x-max-priority

值支持范围是 1 ~ 255 ,推荐使用 1 ~ 5 之间的值,如果需要更高的优先级则推荐 1 ~ 101 ~ 10 已经足够使用,不推荐使用更高的优先级,更高的优先级值需要更多的 CPU 和 内存资源
没有设置优先级的消息将被视为优先级为 0,优先级高于队列最大优先级的消息将被视为以队列最大优先级发布的消息

数据结构

底层数据结构:堆,具体请看:数据结构之堆 → 不要局限于堆排序

ACK超时

之前一直不知道这一点,直到有一次碰到了如下异常

ACK超时异常.png

一查才知道 ACK超时

超时异常

从消费者获取到消息(消息投递成功)开始,在超时时间(默认30分钟)内未确认回复,则关闭通道,并抛出 PRECONDITION_FAILED 通道异常,并且消息会重新进入队列,等待再次被消费
ACK超时的配置项: consumer_timeout ,默认值是 1800000 ,单位是毫秒,也就是 30 分钟,可用命令 rabbitmqctl eval 'application:get_env(rabbit,consumer_timeout).' 查看

查看ack超时配置值命令.png

RabbitMQ 判断是否ACK超时的调度间隔是一分钟,所以 consumer_timeout 不支持低于一分钟的值,也不建议低于五分钟的值,我们不接受这个建议,将 consumer_timeout 调整成 2 分钟,看看超时异常
有 2 种调整方式

  1. 修改 /etc/rabbitmq.conf
    配置文件没有则新建,然后在配置文件中将 consumer_timeout 设置成 120000 (没有该配置项则新增),然后重启 rabbitmq
  2. 动态修改
    执行命令 rabbitmqctl eval 'application:set_env(rabbit,consumer_timeout,120000).' 即可,不需要重启 rabbitmq,但需要注意的是,这种修改不是永久生效,一旦 rabbitmq 重启, consumer_timeout 将会恢复到默认值

我们用第 2 种方式进行调整

动态调整ACK超时.png

然后我们在消费端睡眠 3 分钟后进行ACK

睡眠3分钟后进行ACK.png

最后在 rabbitmq 控制台手动发送一个消息,异常信息如下

2024-02-15 13:08:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|com.qsl.rabbit.listener.TestListener|INFO|28|消费者接收到消息:6
2024-02-15 13:10:47|AMQP Connection 192.168.3.225:5672|org.springframework.amqp.rabbit.connection.CachingConnectionFactory|ERROR|1575|Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 120000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0)
2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|com.qsl.rabbit.listener.TestListener|ERROR|33|消息确认异常:
java.lang.IllegalStateException: Channel closed; cannot ack/nack
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1175)
    at com.sun.proxy.$Proxy50.basicAck(Unknown Source)
    at com.qsl.rabbit.listener.TestListener.onMessage(TestListener.java:31)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:53)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
    at java.lang.Thread.run(Thread.java:745)
2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|INFO|1436|Restarting Consumer@2e6f610d: tags=[[amq.ctag-hE7fVqLNKO44ytMHalsf2A]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@3b1ed14b Shared Rabbit Connection: SimpleConnection@13275d8 [delegate=amqp://[email protected]:5672/, localPort= 55710], acknowledgeMode=MANUAL local queue size=0
2024-02-15 13:11:47|org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2|com.qsl.rabbit.listener.TestListener|INFO|28|消费者接收到消息:6

3分钟超时异常.png

从 RabbitMQ 3.12 开始,可以为每个队列配置过期时长,而之前只能为每个 Rabbit 节点配置过期时长

如何处理

方式有但不局限于以下几种

  1. 增加超时时长
    这往往是最容易想到的,默认 30 分钟不行就改成 60 分钟嘛,但并不是无脑增加超时时长,默认值往往是综合情况下比较优的一个值,并不推荐加长
  2. 异步处理
    用线程池处理异步处理消息,及时进行消息ACK,但需要考虑拒绝策略,如果用的是: CallerRunsPolicy ,还是有可能触发ACK超时
  3. 幂等处理
    消息消费做幂等处理,是规范,而不仅仅只是针对ACK超时
    消息正在消费中,或者已经消费完成,这个消息就不应该再次被消费,可以打印日志然后直接ACK,而无需进行业务处理
  4. 自动ACK
    虽然自动ACK可以简化消息确认的流程,但它也可能带来一些潜在的问题,例如:
    消息丢失风险:自动ACK意味着一旦消费者接收到消息,RabbitMQ 就会将其从队列中移除,如果消费者在处理消息时发生故障或崩溃,未处理的消息可能会丢失
    限流作用减弱:ACK机制可以帮助限流,即通过控制ACK的发送速率来限制消费者处理消息的速度。如果使用自动ACK,这种限流作用会减弱,可能导致消费者过快地消费消息,超出其实际处理能力
    缺乏灵活性:自动ACK不允许消费者在处理完消息后再决定是否要确认消息,这限制了消费者的灵活性。例如,消费者可能需要根据消息内容或处理结果来决定是否重新入队或丢弃消息 等等,总之,自动ACK慎用

具体如何处理,需要结合具体业务,选择比较合适的方式

总结

  1. 优先级队列
    通过配置 x-max-priority 参数表示队列是优先级队列
    队列的优先级取值范围推荐 1 ~ 5,不推荐超过 10
    通过属性 priority 可以指定消息的优先级,没有设置优先级的消息将被视为优先级为 0,优先级高于队列最大优先级的消息将被视为以队列最大优先级发布的消息
    优先级高的消息先出队列(先被处理),优先级低的消息后出队列(后被处理),优先级相同的则是先进先出
  2. ACK超时
    ACK超时是一种保护机制,其实可以类比 HTTP 请求超时、数据库连接查询超时
    RabbitMQ 的ACK超时默认是 30 分钟,可以修改配置项 consumer_timeout 进行调整
    至于如何避免ACK超时,需要结合具体的业务选择合适的方式
  3. 示例代码
    spring-boot-rabbitmq