掘金 后端 ( ) • 2024-04-22 16:23

前言

Hi 你好,我是东东拿铁,一个正在探索个人IP&副业的后端程序员。

今天想和大家聊聊消息队列的一些问题,通过目录结构大家可以看到,都是一些比较基础的问题。

那我为什么还要写这些很多大佬不屑一顾的文章呢?因为身边发生的一些事情,让我发现很多人,即使工作4、5年,也没有去深入了解一下MQ。

  1. 身边团队的同事,即使在用着MQ,也对MQ缺乏最基础的认知,通常是,我说这个地方可以用MQ,大家再去用一下。
  2. 一个同事准备跳槽,但是从来没有用过MQ。在当前就业环境来说,你可以不深入,但是MQ一定要有最基础的了解。

综上,结合自己多年的经验,将工作中最容易遇到的、面试中最高频提问的问题,整理出来提供给大家,希望能对初学MQ的朋友们,提供一点帮助。

为什么需要MQ

  1. 异步处理 比如不重要的一些业务流程,如短信通知、APP PUSH、统计数据
  2. 削峰 使用消息队列隔离网关和后端服务,以达到流量控制和保护后端服务的目的。最常见的就是秒杀的扣库存逻辑,通过MQ进行削峰,避免DB被打垮。
  3. 服务解藕 比如我们有一个订单中台,订单下游的系统需要获取订单的最新数据,随着业务方增加,需求增加,订单中台就需要不断适配下游系统的修改,来提供最新的接口。 通过MQ,订单信息发生变化时,发送消息,所有的下游系统只需要订阅即可。

如何选择消息队列

RocketMQ

RocketMQ由阿里开源, 使用 Java 语言开发,它的贡献者大多数都是中国人,源代码相对也比较容易读懂,你很容易对 RocketMQ 进行扩展或者二次开发。

RocketMQ 的性能比 RabbitMQ 要高一个数量级,每秒钟大概能处理几十万条消息。

Kafka

Kafka在大数据和流计算领域使用非常广泛,几乎所有的相关开源软件系统都会优先支持 Kafka。

但Kafka不太适合在线业务,其中应用到了非常多的异步逻辑,在每秒消息数不够多的时候,时延可能会增加。 RabbitMQ

RabbitMQ

RabbitMQ 是使用一种比较小众的编程语言:Erlang 语言编写的,它最早是为电信行业系统之间的可靠通信设计的,也是少数几个支持 AMQP 协议的消息队列之一。

其他区别可以看下图。

消息可靠性

一条消息的生命周期,大概可以分成三个阶段。 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到Broker端。 存储阶段: 在这个阶段,消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。 消费阶段: 在这个阶段,Consumer从Broker上拉取消息,经过网络传输发送到Consumer上。

在生产、存储、消费阶段,为保证消息可靠行,都有着独自的处理方式,我们一个个来谈。

生产阶段

  1. 请求确认机制:消息队列的客户端会把消息发送到 Broker,Broker收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。
  2. 重试机制:有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者 异常的方式告知用户。 RocketMQ为例
        try{
        
            MqMsg msg = this.buildMsg(topic, data);
            Message message = new Message(topic.getTopic(), topic.getTags(), msg.serialize().getBytes());
            message.putUserProperty("traceId", TraceLogConstant.getTraceId());
            // mq返回相应结果
            SendResult response = this.getProducer().send(message);
        } catch (Exception e){
            // 注意捕获异常
        }

Kafka提供了异步发送功能,异步时我们要注意异步的回调结果

producer.send(record, (metadata, exception) -> {
if (metadata != null) {
    System.out.println(" 消息发送成功。");
} else {
    System.out.println(" 消息发送失败!");
    System.out.println(exception);
}
});

存储阶段

我们发送的消息,都是以写文件的方式存储的,对于RocketMQ和kafka来说持久化方式分为:同步刷盘 和 异步刷盘。 同步刷盘:SYNC_FLUSH。将消息写入磁盘时,等待返回的ack,代表写入成功,具有最强的可靠性,一般只有特定场景可能会用到。 异步刷盘:ASYNC_FLUSH。消息会先写入内存,然后Broker端会开启一个线程,异步的执行刷盘操作,真正把数据写入磁盘。坏处就是如果宕机,内存的数据可能丢失,好处就是能够极大的增加吞吐量。

如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段

消息消费确认机制:RocketMQ 提供了消费者消费确认机制。消费者在成功消费消息后,可以向服务器发送确认消息,告知服务器该消息已被消费,服务器会将该消息标记为已消费,不再推送给该消费者。

消息重试:当消费者处理消息失败时,RocketMQ允许配置消息的重试策略。消费者可以在处理失败时将消息标记为需要重试,并设定重试的次数和间隔。

    @Override
    public boolean receive(List<ReceiveMessage<Msg>> msgList) {
        return true;
    }

如何处理重复消息

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什 么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使 用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消 息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重 复,这个是最高的等级

既然会有重复消息,那么我们如果不对重复消息进行处理,那么可能会导致业务数据问题。

幂等性是指对同一操作的重复执行不会产生额外的影响,即使该操作被重复执行多次,结果也应该与仅执行一次时的结果相同。采用幂等性可以有效解决重复消息的问题,大概有三种解决思路:

利用数据库的唯一约束

每条消息都会带有唯一ID(MsgID),那么我们可以通过这个id来进行判断是否处理过。

使用关系型数据库,建立消息流水表,并对MsgId添加唯一索引,只允许插入一次。

但使用关系型数据库,每一个消息都需要建一张表,太麻烦了。我们可以利用Redis 的 SETNX 命令来替代关系型数据库,如果存在,直接返回,不存在再执行业务逻辑。这也是最常用的方式了。

前置条件判断

在你的数据库中增加版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,可以实现幂等更新。

消息积压

优化消费者性能

绝大部分挤压的情况,都是消费者性能的问题。

设计之初,首先我们要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能持续运行。

如果生产端没有特别大的流量波动,但是出现了消息积压,那么我们就要先排查消费者服务是否正常,是否存在性能问题。

具体排查思路,可以看这篇文章。

当然,提升一下硬件配置,也可以提升性能。

下图是RocketMQ的控制台,查看具体消费情况的截图。

扩容

消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。

特别需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保Consumer的实例数和分区数量是相等的。

如果Consumer的实例数量超过分区数量,这样的扩容实际上是没有效果的。

其实在实际生产中,遇到过已经把Consumer的实例数扩容到和分区一致了,依然有消息积压,此时扩容需要扩展分区了,但是扩展分区需要让所有消费方重启,影响面太大,最后不了了之了。。。

如何保证消息有序消费

RocketMQ 提供了分区有序和全局有序两种消费方式。局部有序是指一个消费者在消费同一个队列的消息时是有序的,全局有序是指一个主题下的所有消息都要保证在消费时是有序的。

分区顺序消息

对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

适用场景:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。 示例:电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

全局顺序消息

对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。

适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。

全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。

因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。

事务消息

什么是分布式事务?

那什么是事务呢?如果我们需要对若干数据进行更新操作,为了保证这些数据的原子性和一致性,我们希望这些更新操作要么都成功,要么都失败。至于更新的数据,不只局限于数据库中的数据,可以是磁盘上的一个文件,也可以是远端的一个服务,或者以其他形式存储的数据。

比如我们有这么一种场景,用户从购物车下单,支付完成之后,我们发送支付完成消息,购物车服务把已经付款的商品删除,利用RocketMQ,我们怎么来实现呢?

RocketMQ中的分布式事务

先说流程

  1. 实现事务监听器接口:开发者需要实现 RocketMQ 的事务监听器接口(TransactionListener),该接口包括两个方法:executeLocalTransaction 和 checkLocalTransaction。其中,executeLocalTransaction 方法用于执行本地事务,checkLocalTransaction 方法用于检查本地事务的状态。

  2. 发送事务消息:在发送事务消息时,需要指定事务监听器,并在 executeLocalTransaction 方法中执行本地事务操作。RocketMQ 会在发送消息后,调用 executeLocalTransaction 方法来执行本地事务,然后根据返回结果来决定是提交还是回滚事务。

  3. 回查本地事务状态:RocketMQ 在发送事务消息后,会启动一个定时任务来定期回查本地事务的状态。回查的频率和次数可以通过配置来设置。在回查时,RocketMQ 会调用 checkLocalTransaction 方法来检查本地事务的状态,如果返回 COMMIT 状态,则提交事务,如果返回 ROLLBACK 状态,则回滚事务。

  4. 消息回查任务配置:开发者需要合理配置消息回查任务的频率和次数,以确保及时发现本地事务状态,并确保消息的最终一致性。

总结一下,RocketMQ要实现分布式事务,主要是半消息,消息先发送到Broker,但不会立即被消费者消费到。

我们需要提供回查接口,Broker端可以通过我们的回调接口,判断生产者的本地事务是否执行成功。所以我们也说RocketMQ有双向通信的功能。

说在最后

本文带大家了解MQ最常见的几个问题,一起来回顾一下

  1. 为什么需要MQ?
  2. 不同MQ之间,如何做技术选型?
  3. 消息可靠性
  4. 重复消息
  5. 消息积压如何实现
  6. 顺序消息如何实现
  7. 事务消息

不知道你在工作中,处理过哪些消息队列的问题,可以和大家分享呢?欢迎你在评论区与我交流。希望本文能够为你工作和面试中提供一些参考和帮助,看到这里,希望点赞评论支持一下,也欢迎你加我的wx:Ldhrlhy10,一起交流~