掘金 后端 ( ) • 2024-05-06 15:57

theme: channing-cyan

RocketMQ在国内系统中使用还是比较多的,面试中,工作中常常被提及,以前有逐帧阅读过RocketMQ的源码,长时间不回顾,也忘得差不多了。这次就再拾起来重新回顾一番,也算加深下印象。

这篇文章就详细说说RocketMQ中的Broker,Broker是消息存储的实际节点,负责消息的存储、传输和消费者的请求响应等功能。一个RocketMQ集群可以包含多个Broker,每个Broker负责存储一部分消息数据。

下面详细解释一下Broker的功能和特点:

消息存储

Broker负责存储生产者发送的消息,这些消息被持久化到磁盘上,以保证数据的可靠性和持久性。消息存储通常采用的是写入磁盘的方式,以防止消息在服务宕机或网络异常情况下丢失。

那么消息是如何存储的呢?

1. 消息存储文件

当生产者发送消息到 RocketMQ Broker 时,消息首先被写入消息存储文件(CommitLog)。CommitLog 是一个顺序写入的文件,它记录了所有发送到 Broker 的消息内容。消息在 CommitLog 中以顺序追加的方式存储,每条消息会附带一些元数据,如消息的大小、属性等。

就像一个快递员将快递放入一个特定的快递柜中一样,生产者发送的消息首先被写入消息存储文件(CommitLog)。这个存储文件就像是一个特殊的快递柜,记录了所有发送到 RocketMQ Broker 的消息内容。每个快递包裹在柜子中都有一个唯一的位置,并且附带有一些关键信息,比如快递的大小、收件人信息等。

2. 消息索引文件

为了提高消息检索的效率,RocketMQ 使用了消息索引文件(Index File)来加速消息的检索。索引文件中存储了消息的关键信息,如消息的偏移量、消息的关键字等。通过索引文件,RocketMQ 可以快速定位到消息在 CommitLog 中的物理位置,从而提高了消息的读取速度。

就像是在快递柜上贴了一张清单一样。这个清单中记录了每个快递包裹的关键信息,比如快递的编号、收件人信息等。通过这个清单,快递员可以快速找到特定快递的位置,提高了取件的效率。

3. 消息存储结构

消息在存储文件中以一定的格式存储,通常包括消息的长度、魔数(用于校验)、消息的标识符、消息的属性等信息。存储文件中的消息是以顺序追加的方式存储的,保证了消息的顺序性。

每个快递包裹都有一定的标识和信息,比如快递的大小、重量、目的地等。

4. 消息存储过程

当生产者发送消息时,消息首先被追加到 CommitLog 中,然后 RocketMQ 根据消息的主题、队列等信息将消息存储到对应的消息队列中。消息队列中的消息按照存储文件和索引文件的方式进行存储和管理,保证了消息的持久化和高效检索。

当生产者发送快递时,快递员首先将快递放入快递柜中,然后根据快递的目的地将快递放入对应的格子中。

5. 消息文件刷盘

为了确保消息的持久化,RocketMQ 会定期将消息存储文件中的数据刷盘到磁盘上,以防止数据丢失。消息存储文件的刷盘策略可以根据配置进行调整,通常包括同步刷盘和异步刷盘两种方式。

就像是快递员定期清理快递柜一样,以防止数据丢失。

消息传输

Broker负责将生产者发送的消息传输给消费者。它接收生产者发送的消息,并将消息存储在消息队列中,等待消费者消费。当消费者请求获取消息时,Broker会将消息传输给消费者。 那么消息具体是如何传输的呢?

1. 生产者发送消息

当生产者产生消息并发送到 RocketMQ Broker 时,它首先需要与 Broker 建立连接。生产者使用指定的 Topic 将消息发送到 Broker。生产者与 Broker 之间的通信可以基于 TCP 协议或者其他网络协议。

当我们想要发送快递的时候,我们需要先与快递员取得联系,使用指定的快递单号来发快递。

2. Broker 接收消息

当 Broker 接收到生产者发送的消息后,它会将消息写入 CommitLog 中,并根据消息的主题、队列等信息将消息存储到对应的消息队列中。此时,消息已经被 Broker 接收并持久化存储。

当快递员收到包裹之后,快递员会将包裹写入快递柜(CommitLog),并根据快递的目的地(消息的主题、队列等信息),将包裹存放到相应的格子中。此时,包裹已经被快递员接收并持久化存储。

3. 消费者订阅消息

在消费者需要获取消息时,首先需要订阅特定的主题(Topic)。消费者与 Broker 之间也需要建立连接,然后向 Broker 发送订阅请求,表示对某个主题下的消息感兴趣。

收快递的小伙伴,会拿到自己的快递单号,也需要联系快递员,说是这个快递是自己的。

4. Broker 提供消息

当消费者订阅了特定的主题后,Broker 将会为该消费者提供消息。当消费者发送拉取消息的请求时,Broker 会从相应的消息队列中获取消息,并将消息发送给消费者。消费者可以根据自己的消费能力和需求,按照一定的频率从 Broker 获取消息。

当收快递的小伙伴,确定了自己收货的快递单号之后,快递员就会给这个小伙伴派送快递。小伙伴也可以去快递中心自提快递,快递员会找到这个快递并交给这个小伙伴。要是一次拿不完,可以根据自己的能力,分多次拿。

5. 消息传输协议

在 Broker 和客户端之间的消息传输通常基于 TCP 协议。RocketMQ 提供了基于 TCP 的通信协议,用于生产者和消费者与 Broker 之间的消息传输。除此之外,RocketMQ 还支持其他通信协议,如 HTTP、SSL 等,以满足不同的通信需求。

想联系快递员的时候,可以发短信,也可以打电话,也可以发消息。

消费者的请求响应

当消费者请求消费消息时,Broker负责响应消费者的请求,并将消息传输给消费者。Broker根据消费者的订阅关系,从消息队列中获取相应的消息,并将消息传输给消费者进行处理。 broker具体是如何响应消费者的请求的呢?

1. Broker 维护消费者信息

当 Broker 收到消费者的订阅请求后,会维护消费者与订阅的主题之间的映射关系。这样,Broker 就知道了哪些消费者订阅了哪些主题,以便在有消息需要推送给消费者时能够准确地选择目标消费者。

就像 Broker 维护消费者信息一样,快递员会记录每个收件人的地址和联系方式。这样,当有快递需要发送时,快递公司就知道应该将快递送到哪里。

2. 消费者拉取消息

如果是拉取模式消费,消费者会向 Broker 发送拉取消息的请求。请求中通常包含了消费者的 ID、消费的主题、消息队列的偏移量等信息。Broker 接收到拉取消息的请求后,会根据请求的参数从相应的消息队列中获取消息,并将消息返回给消费者。

收件人会到快递公司或者快递点主动领取包裹。他们会提供自己的身份信息以及包裹的相关信息,比如取件号码等。然后快递员根据这些信息找到相应的包裹,并交给收件人。

3. 消费者订阅更新

如果消费者订阅的主题有新的消息到达,Broker 会根据消费者订阅的模式(拉取模式或推送模式)向消费者发送消息。对于推送模式,Broker 会主动向消费者推送消息;对于拉取模式,Broker 会等待消费者来拉取消息。

如果有新的包裹到达,快递公司会根据收件人提供的联系方式通知他们。对于快递公司的推送模式,他们会主动通知收件人有包裹;对于拉取模式,收件人会等待快递公司通知他们有包裹到达。

4. 消息确认

在消费者成功处理完一条消息后,通常会向 Broker 发送消息确认(ACK)请求,告知 Broker 消息已经被消费成功,可以进行下一步的消息推送或者拉取。Broker 收到消息确认后,会更新消息消费进度,以便下次推送或拉取消息时能够跳过已经消费过的消息。

当收件人成功领取了包裹后,他们会在快递单上签字确认收件。这类似于消费者向 Broker 发送消息确认,告知快递公司包裹已经被成功领取,可以进行下一步的快递送达或者通知。

5. 超时处理

如果消费者在一定时间内没有发送确认消息,Broker 可能会将未确认的消息重新发送给消费者,或者将未确认的消息标记为消费失败,进行相应的处理。

如果收件人在一定时间内没有领取包裹,快递公司可能会再次通知或者尝试送达;如果仍然没有成功,他们可能会将包裹退回寄件人或者进行其他处理,比如联系收件人确认地址等。

水平扩展

RocketMQ集群可以包含多个Broker,每个Broker负责存储一部分消息数据。这种设计使得RocketMQ能够水平扩展,通过增加Broker节点来提高消息存储的容量和吞吐量。

那么如何进行Broker的水平扩展呢?

1. 添加新的 Broker 节点

在需要进行扩展的 RocketMQ 集群中添加新的 Broker 节点。新的 Broker 节点可以部署在不同的物理服务器或虚拟机上,以实现分布式的消息存储和处理。

在快递服务中,可以开设新的派送中心。这些中心可以位于不同的地理位置,以满足不同区域的需求,并实现快递服务的分布式处理。

2. 配置 Broker 参数

配置新的 Broker 节点的参数,包括 Broker 的名称、IP 地址、端口号、存储路径等。确保新的 Broker 节点的配置与现有节点的配置保持一致,以便与现有节点进行协同工作。

新的派送中心需要配置相关参数,比如地址、服务范围、配送时间等。确保新中心的配置与现有派送中心保持一致,以便协同工作。

3. 同步配置文件

确保新的 Broker 节点的配置文件与现有节点的配置文件保持一致。可以手动复制现有节点的配置文件到新的节点,或者使用自动化工具来同步配置文件。

确保新的派送中心与现有中心的操作规程一致。可以通过培训和手册等方式将操作规程同步给新的中心。

4. 启动新的 Broker 节点

启动新的 Broker 节点,并确保其能够成功加入到集群中。在启动过程中,可以监控日志输出和运行状态,确保新的节点能够正常工作。

启动新的派送中心,并确保它能够顺利加入服务网络。在此过程中,需要监控其运行状态,以确保正常运作。

5. 数据迁移和负载均衡

如果需要将现有的消息数据迁移到新的 Broker 节点上,可以使用 RocketMQ 提供的数据迁移工具或者自行编写脚本来实现数据的迁移。同时,可以通过负载均衡策略来调整消息的分布,使得消息能够均匀地分布在所有的 Broker 节点上。

如果需要将现有的派送任务分配给新的派送中心,可以通过重新规划路线或者调整任务分配来实现。同时,确保派送任务能够均匀地分布在所有的派送中心上。

6. 更新路由信息

当新的 Broker 节点加入集群后,需要更新 Name Server 中的路由信息,确保客户端能够正确地发现并连接到新的节点。Name Server 会定期向所有的 Broker 节点发送心跳消息,以更新路由信息。

当新的派送中心加入服务网络后,需要更新相关信息,比如网站上的服务范围和联系方式等,以便客户能够及时获取最新信息。

7. 监控和调优

在新的 Broker 节点加入集群后,需要进行监控和调优工作,以确保集群的稳定运行和高效处理消息。可以使用监控工具来监控集群的运行状态,并根据需要进行性能调优和资源优化。

新的派送中心加入服务网络后,需要进行监控和优化工作,以确保服务的稳定性和高效性。可以通过监控工具来监控派送任务的执行情况,并根据需要进行性能调优和资源优化。

负载均衡

当一个消息队列的消息量过大或者某个Broker的负载过高时,RocketMQ会通过负载均衡策略自动将消息分配到其他Broker上,以实现集群的负载均衡和高可用性。

那么broker的负载均衡是如何实现的呢?

1. 消息队列划分

RocketMQ 中的每个主题(Topic)都会被划分为多个消息队列(Message Queue)。消息队列的数量取决于配置和需求,通常情况下,一个主题会被划分为多个消息队列,以实现消息的并行处理和负载均衡。

快递服务可以将区域划分为不同的分区,每个分区对应一个区域或者一组区域的快递需求。这样,每个分区都有自己的快递队列。

2. 消费者组

消费者可以以消费者组(Consumer Group)的形式订阅特定的主题。每个消费者组中可以包含多个消费者实例,这些消费者实例共同消费主题下的消息。

一个公司的小伙伴收同一类快递的时候,就可以认为是一个消费者组,这些小伙伴都收这一类快递,效率会提高很多。

3. 负载均衡算法

RocketMQ 使用的是一种分区(Partition)的负载均衡算法。具体来说,每个消息队列被分配给一个或多个消费者实例,以确保每个消息队列都能被消费者实例消费。消费者实例之间通过负载均衡算法来均匀地分配消息队列,从而实现消息的均衡消费。

这个可以理解为拉过来好几车快递给同一个公司的好多小伙伴,这时候,每个车就对应几个小伙伴,确保每个车后边都有对应的。不会遗漏。

4. Rebalance 机制

RocketMQ 提供了自动的 Rebalance 机制来实现消费者实例的负载均衡。当消费者实例加入或退出消费者组时,RocketMQ 会自动重新分配消息队列,以确保每个消费者实例消费的消息数量大致相同。

当有小伙伴因为其他原因不能来收快递的时候,就需要重新分配这个对应关系。确保每个快递车后边收快递的小伙伴大致相同。

5. 消费者的并发处理

每个消费者实例可以设置多个线程来并发地消费消息。消费者实例内部会负责将消息队列中的消息分配给不同的线程进行处理,从而提高消息的处理效率。

每个收快递的小伙伴可以同时处理多个包裹。效率嘎嘎快。

消息复制

为了保证消息的可靠性和容错性,RocketMQ支持消息的主从复制机制。即使某个Broker宕机,也能够通过备份的方式快速恢复数据,从而保证消息不会丢失。

Broker的消息复制又是如何进行的呢?这里再用快递的例子就不合适了。我们换一个来进行讲解说明。

1. 主从模式

RocketMQ 中的 Broker 通常以主从模式工作,其中一个 Broker 被选为主 Broker,负责处理消息的写入和复制;其他的 Broker 则作为从 Broker,负责从主 Broker 复制消息并提供消息的读取服务。

在一个大型公司的办公室文件传递系统中,通常会有一个主文件管理员负责管理所有文件的分发和收集。其他部门或小组的成员则充当从文件管理员,负责接收文件并传递给指定的人员。

1. 消息同步

当主 Broker 接收到生产者发送的消息时,会将消息写入到本地的 CommitLog 中,并同时将消息发送给所有的从 Broker。从 Broker 收到消息后,会将消息写入到本地的 CommitLog 中,实现消息的复制。

当主文件管理员收到新的文件时,他会在文件记录中标注并通知所有相关部门或小组的成员。这样,每个部门或小组的成员都能够及时收到文件,并进行相应的处理。

1. 同步策略

主 Broker 发送消息给从 Broker 时,可以采用同步的方式进行复制,也可以采用异步的方式进行复制。同步复制可以确保主从 Broker 之间的数据一致性,但会影响主 Broker 的性能;异步复制可以提高主 Broker 的性能,但可能会出现数据不一致的情况。

主文件管理员可以选择同步或异步方式进行文件的传递。同步方式会在文件传递时等待所有相关部门或小组的确认,以确保文件的一致性;而异步方式则会直接传递文件,不等待确认,以提高传递效率。

1. 复制延迟

由于网络延迟等因素,从 Broker 接收到消息的时间可能会有一定的延迟。RocketMQ 中通常会设置一个复制延迟阈值,当从 Broker 复制消息的延迟超过阈值时,主 Broker 会主动尝试重新发送消息,以确保数据的一致性。

由于部门或小组之间的通信可能会受到网络延迟等因素的影响,所以有时候文件的传递可能会有一定的延迟。这时,主文件管理员可能会重新发送文件或采取其他措施,以确保文件的及时传递。

1. 故障恢复

如果主 Broker 发生故障或者宕机,系统会自动选举新的主 Broker 来替代原来的主 Broker。同时,从 Broker 会自动切换到新的主 Broker 上进行消息复制和读取,以确保系统的高可用性和数据的可靠性。

如果主文件管理员因为某种原因无法继续工作,系统会自动将一个备用的文件管理员替代原来的主文件管理员。同时,其他部门或小组的成员会自动切换到新的文件管理员上,以确保文件传递系统的正常运行。

综上所述,Broker作为RocketMQ集群中的核心组件,承担了消息存储、传输和消费者请求响应等重要功能,保障了整个消息中间件系统的可靠性、高效性和可扩展性。