掘金 后端 ( ) • 2024-04-20 09:52

RocketMQ官网:https://rocketmq.apache.org/zh/ RocketMQ项目源码:https://github.com/apache/rocketmq

RocketMQ 是什么

RocketMQ是一个分布式开源消息中间件,整个RocketMQ项目由阿里巴巴团队用Java语言开发完成。

2016年捐赠给了Apache基金会,Apache RocketMQ 是国内首个非 Hadoop 生态体系的顶级项目,是Apache的顶级项目(TLP),是目前最流行的消息中间件之一。

RocketMQ的核心特性

我们截取官网的一张图,非常直观和明确

为什么选择RocketMQ

消息中间件有非常多,比如大家熟知的Kafka,ActiveMQ,RabbitMQ等等,对于技术我们不说绝对的优劣,但是经过阿里巴巴多年的双十一等海量并发场景下的历练,RocketMQ在可靠性,高并发,低延迟等方面有着巨大的优势,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

当然,对于RocketMQ和其他中间件的详细差别,可以到官网进行学习,此处不过多赘述。

RocketMQ核心概念

主题(Topic)

topic简单来说可以理解为一类消息,即消息的分类,官方建议将不同业务类型的消息定义成不同的主题,从而实现消息存储和订阅的隔离性

注意,topic为逻辑概念,topic内部由多个队列组成,队列实现了消息的存储

在RocketMQ5.X的版本之后,当我们在定义topic指定了消息的类型时,我们发送的消息必须严格按类型来发送,否则会抛出消息类型不匹配的异常,拒绝消息发送

消息类型会在后面介绍到。

队列(MessageQueue)

在上面topic处的图我们可以看出,一个topic内部有多个MessageQueue,是消息的最小存储但愿,多个队列提高了消息的处理能力和扩展性。

下面是摘自官网描述的存储模型

存储顺序性

  • 队列天然具备顺序性,即消息按照进入队列的顺序写入存储,同一队列间的消息天然存在顺序关系,队列头部为最早写入的消息,队列尾部为最新写入的消息。消息在队列中的位置和消息之间的顺序通过位点(Offset)进行标记管理。

流式操作语义

  • Apache RocketMQ 基于队列的存储模型可确保消息从任意位点读取任意数量的消息,以此实现类似聚合读取、回溯读取等特性,这些特性是RabbitMQ、ActiveMQ等非队列存储模型不具备的。

消息(Message)

message是RocketMQ的数据传输最小单元,所有的数据都会封装为message来发送

消息在整个MQ中是不可变的,并且持久化

message有一些关键参数

  • Topic:上面已经介绍
  • ID:每条消息的全局唯一ID
  • Tag:可以理解为在Topic的基础上继续细分
  • 消费重试次数: 消息消费失败后,RocketMQ服务端重新投递的次数

生产者(Producer)

生产者的概念比较易懂,即消息的生产方,同一个生产者可以向不同的topic发送消息。

消费者(Consumer)

消费者是RocketMQ中用来接收消息和处理消息的实体,可定义消费重试策略,后面会详细解释消费重试

  • 支持以推(push),拉(pull)两种模式对消息进行消费。
  • 同时也支持集群方式和广播方式的消费。
  • 提供实时消息订阅机制,可以满足大多数用户的需求。

消费者组(ConsumerGroup)

即对同一个消费行为的一个负载均衡的消费组。主要目的是负载均衡和容灾,也便于扩容。

消费位点(Offset)

消费位点保存了消息消费到了队列中的哪一个位置,防止重复消费

代理服务器(Broker)

代理服务器主要负责消息的存储,或者说是负责MessageQueue的存储,并提供投递和查询的功能。

Broker中可以有多个Topic,以Master-Salver的方式来部署。

看下面的图会更加直观:

名字服务器(NameServer)

NameServer其实就是注册中心,可以理解为ZooKeeper,负责Broker的注册和服务发现。

NameServer以集群的方式部署但是互相之间无状态不通信,一个挂了其他正常工作无影响。

订阅关系(Subscription)

订阅关系是针对消费者的,描述了消费者获取消息的状态。

  • 消息过滤规则:用于控制消费者在消费消息时,选择主题内的哪些消息进行消费,设置消费过滤规则可以高效地过滤消费者需要的消息集合,灵活根据不同的业务场景设置不同的消息接收范围。
  • 消费状态:Apache RocketMQ 服务端默认提供订阅关系持久化的能力,即消费者分组在服务端注册订阅关系后,当消费者离线并再次上线后,可以获取离线前的消费进度并继续消费。

RocketMQ整体工作流程

1、启动 NameServer

启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,相当于一个路由控制中心。

2、启动 Broker

启动 Broker。与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。

3、创建 Topic

创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。

4、生产者发送消息

生产者发送消息。启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker建立长连接从而向 Broker发消息。

5、消费者接受消息

消费者接受消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,然后开始消费消息。

消息类型

普通消息

普通消息是MQ中最基础的一种消息,当我们对消息的处理时机和处理顺序不关心的时候可以用普通消息。

比如官网的一个例子:

上游将用户下单支付这个业务封装成消息发送给MQ,每一个消息都是独立的一个订单,没有任何关联或者前后顺序要求,此时用普通消息即可。

定时/延时消息

延时消息是指客户端在发送message时,指定一个时间t,当消息到达MQ之后,经过时间t之后才会将消息推送给消费者;

最经典的一个例子就是取消订单,我们购物时下单未付款时,会有比如30分钟的付款期,此时就可以用延时消息,避免频繁扫描数据库带来的性能瓶颈,MQ的高并发和水平扩展能力很强。

顺序消息

顺序消息指消费者处理消息的顺序一定要严格按照生产者生产消息的顺序来处理。

比如以官网数据实现增量同步的例子:

以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

那么问题来了,如何保证消息的顺序?我们直接看看官网是怎么说的:

  • 生产顺序性

    • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
    • 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
  • 消费顺序性、

    • 投递顺序

      • Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序。
    • 有限重试

      • Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。

必须同时满足生产顺序性和消费顺序性才能保证顺序消息

事务消息

事务消息能分布式场景下保障消息生产和本地事务的最终一致性。

  1. 生产者将消息发送至Apache RocketMQ服务端。

  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    1. 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    2. 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

消费模式

  • 集群模式

当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。

可以通过扩缩消费者数量,来提升或降低消费能力,具体示例如下图所示,是最常见的消费方式。

  • 广播模式

当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。

即使扩缩消费者数量也无法提升或降低消费能力

  • 负载均衡

Apache RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等

消费类型

  • PUSH型 Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

原理:

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

  • PULL型 Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

总结

结合RocketMQ的官网和个人在使用上的一些认知来分享了一下RocketMQ的最基本的知识点,如果大家发现有不正确的地方或者有疑惑的地方欢迎评论区指出。

后面会继续补充消费重试,重复消费,解决消息发送失败等内容,并且会更新Java SDK来使用RocketMQ的详细教程,欢迎大家关注。