掘金 后端 ( ) • 2024-04-22 15:07

theme: healer-readable highlight: a11y-dark

MQ 介绍

消息队列是一种“先进先出”的数据结构

queue1.png

其应用场景主要包含以下3个方面

应用解耦

系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

解耦1.png

使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

解耦2.png

流量削峰

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提高系统的稳定性和用户体验。

mq-5.png

一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样比总不能下单体验要好。

mq-6.png

出于经济考量目的:

业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰

数据分发

这样每次换服务都需要修改代码

mq-1.png

通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可

mq-2.png

MQ的优点和缺点

优点:解耦、削峰、数据分发

缺点包含以下几点:

  • 系统可用性降低

    系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。

    如何保证MQ的高可用?

  • 系统复杂度提高

    MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。

    如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

  • 一致性问题

    A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。

如何保证消息数据处理的一致性?

MQ 产品比较

特性 ActiveMQ RabbitMQ RocketMQ Kafka 开发语言 Java Erlang Java Scala 单机吞吐量 万级 万级 百万级 百万级 时效性 ms级 us级 ms级 ms级内 可用性 高(主从架构) 高(主从架构) 非常高(分布式架构) 非常高(分布式架构) 功能特性 成熟的产品,在很多公司得到应用,有较多的文档,各种协议支持较好 基于 Erlang 开发,所以并发能力很强,性能极好,延时很低,管理界面较丰富 MQ 功能比较完备、扩展性佳 只支持主要的MQ功能,像一些信息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广

快速入门

RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,在阿里内部,RocketMQ承接了例如“双11”等高并发场景的消息流转,能够处理万亿级别的消息。

安装 rocketmq

首先需要安装 maven 和 jdk

通过 apt 安装,apt install maven

绍RocketMQ安装过程。

解压5.2.0的二进制可执行文件

image.png

目录介绍

  • bin:启动脚本,包括shell脚本和CMD脚本
  • conf:实例配置文件 ,包括broker配置文件、logback配置文件等
  • lib:依赖jar包,包括Netty、commons-lang、FastJSON等
unzip rocketmq-all-5.2.0-bin-release.zip

进入 bin 目录进行启动

启动 NameServer

nohup sh mqnamesrv &

image.png

查看日志--如果日志存在表示启动成功

tail -f ~/logs/rocketmqlogs/namesrv.log

启动Broker+Proxy

NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。

RocketMQ默认的虚拟机内存较大,启动Broker如果因为内存不足失败,需要编辑如下两个配置文件,修改JVM内存大小

编辑runbroker.sh和runserver.sh修改默认JVM大小

vi runbroker.sh vi runserver.sh

修改以下内容

JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

启动

nohup sh mqbroker -n localhost:9876 --enable-proxy &
### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
tail -f ~/logs/rocketmqlogs/proxy.log 

关闭命令

# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker

windows 安装

配置系统中的环境变量

  • 变量名:ROCKETMQ_HOME
  • 变量值:MQ解压路径\MQ文件夹名(bin目录的上一级即可)

image.png

启动 nameserver

在bin目录下执行cmd命令呼出命令框,执行 start mqnamesrv.cmd

image.png

启动Broker

执行 start mqbroker.cmd -n 127.0.0.1:9876 --enable-proxy &

测试 RocketMq

在bin目录进行操作

发送消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh tools.sh org.apache.rocketmq.example.quickstart.Producer

接收消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

RocketMQ集群搭建

各角色介绍

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
  • Message Queue:相当于是Topic的分区;用于并行发送和接收消息

Producer

首先找 NameServer 询问发送给哪一个 Broker,然后再向Broker 发送信息。

NameServer

管理 Broker,Broker 自己会上报自己状态信息到 NameServer

Consumer

首先去 NameServer 去获取 Broker 地址,然后去向该地址进行消费数据。

Broker

接受消息,存储消息。

RocketMQ角色.jpg

集群搭建方式

NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步,因为Broker 会向所有的 NameServer 上报信息,不需要同步。

Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个Master 可以对应多个 Slave,但是一个Slave只能对应一个 Master,Master与Slave的对应关系通过指定相同的。
Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId 为 0 表示Master非 0 表示 Slave
Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

集群模式

Broker 集群部署方式

单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

多Master模式 一个集群无 Slave,全是 Master,例如2个 Master 或者3个 Master,这种模式的优缺点如下:

  • 优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

多Master多Slave模式(异步)

每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时 Master 宕机后,消费者仍然可以从Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样;
  • 缺点:Master 宕机,磁盘损坏情况下会丢失少量消息。

多Master多Slave模式(同步)

每个 Master 配置一个 Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

双主双从集群搭建

RocketMQ集群.png

集群工作流程

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

集群搭建

服务器环境

序号 IP 角色 架构模式 1 192.168.80.137 nameserver、brokerserver Master1、Slave2 2 192.168.80.138 nameserver、brokerserver Master2、Slave1

向 Host添加信息,集群都要设置

vim /etc/hosts

# nameserver
192.168.80.137 rocketmq-nameserver1
192.168.80.138 rocketmq-nameserver2
# broker
192.168.80.137 rocketmq-master1
192.168.80.137 rocketmq-slave2
192.168.80.138 rocketmq-master2
192.168.80.138 rocketmq-slave1

配置完成后, 重启网卡

sudo systemctl restart NetworkManager

防火墙配置

宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙

# 关闭防火墙
sudo ufw disable
# 查看防火墙的状态
sudo ufw status
# 禁止firewall开机启动
sudo systemctl disable ufw

或者为了安全,只开放特定的端口号,RocketMQ 默认使用3个端口:9876 、10911 、11011 。如果防火墙没有关闭的话,那么防火墙就必须开放这些端口:

  • nameserver 默认使用 9876 端口
  • master 默认使用 10911 端口
  • slave 默认使用11011 端口
# 开放name server默认端口
sudo ufw allow 9876
# 开放master默认端口
sudo ufw allow 10911
# 开放slave默认端口 (当前集群模式可不开启)
sudo ufw allow 11011
# 重启防火墙
sudo ufw reload

环境变量配置

sudo vi ~/.bashrc

末尾加入如下命令

# Set RocketMQ environment variables
export ROCKETMQ_HOME=/var/opt/rocket-l/rocketmq-all-5.2.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin

更新

source ~/.bashrc

创建消息存储路径

# 创建 RocketMQ 主目录
mkdir /usr/local/rocketmq/store
# 创建 RocketMQ commitlog 目录(用于存储消息数据)
mkdir /usr/local/rocketmq/store/commitlog
# 创建 RocketMQ consumequeue 目录(用于存储消费队列数据)
mkdir /usr/local/rocketmq/store/consumequeue
# 创建 RocketMQ index 目录(用于存储索引数据)
mkdir /usr/local/rocketmq/store/index

broker 配置文件

文件位于 /var/opt/rocket-l/rocketmq-all-5.2.0-bin-release/conf

  • 2m-2s-async/: 异步双主模式配置目录
  • 2m-2s-sync/: 同步双主模式配置目录
  • 2m-noslave/: 双主模式无从模式配置目录
  • broker.conf: Broker 配置文件
  • container/: 容器配置目录
  • controller/: 控制器配置目录
  • dledger/: 分布式日志存储配置目录
  • plain_acl.yml: 普通 ACL 配置文件
  • rmq.broker.logback.xml: Broker 日志配置文件
  • rmq.client.logback.xml: 客户端日志配置文件
  • rmq.controller.logback.xml: 控制器日志配置文件
  • rmq.namesrv.logback.xml: 名字服务器日志配置文件
  • rmq-proxy.json: RocketMQ 代理配置文件
  • rmq.proxy.logback.xml: 代理日志配置文件
  • rmq.tools.logback.xml: 工具日志配置文件
  • tools.yml: 工具配置文件

进入双主双从同步目录

首先通过 JPS 查看启动状态,需要停止运行 borker 和 nameserver 再修改配置文件

位于 /var/opt/rocket-l/rocketmq-all-5.2.0-bin-release/conf/2m-2s-sync

  • broker-a.properties: 代理 A 的配置文件。
  • broker-a-s.properties: 代理 A 的备份/同步(standby)配置文件。
  • broker-b.properties: 代理 B 的配置文件。
  • broker-b-s.properties: 代理 B 的备份/同步(standby)配置文件。

我们打开 broker-a.properties 这是 master 的配置文件

# RocketMQ 代理集群的名称
brokerClusterName=DefaultCluster
# 代理的名称,这里是 broker-a
brokerName=broker-a
# 代理的唯一标识 ID
brokerId=0
# 24小时制,表示凌晨4点删除消息文件
deleteWhen=04
# 文件保留时间达到指定小时后,执行文件删除任务
fileReservedTime=48
# 代理角色,SYNC_MASTER 表示同步主节点
brokerRole=SYNC_MASTER
# 磁盘刷写类型,ASYNC_FLUSH 表示异步刷盘
flushDiskType=ASYNC_FLUSH

在 192.168.80.137 服务器上修改broker-a.properties的配置信息

角色:master1

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

在 192.168.80.137 服务器上修改broker-b-s.properties的配置信息

注意文件是 broker-b-s 因为是从节点

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

主从不一样的地方

  • brokerName:broker名字
  • brokerId:唯一ID
  • brokerRole:角色
  • flushDiskType:磁盘刷新方式
  • listenPort:对外开放端口

在 192.168.80.138 服务器上修改broker-b.properties的配置信息

这是主节点

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

在 192.168.80.138 服务器上修改broker-a-s.properties的配置信息

这是从节点

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

启动集群

启动NameServe集群---两台服务器都需要启动

mqnamesrv &

启动Broker集群

在 192.168.80.137 上启动master1和slave2

遇到三个问题有

  1. java.lang.NullPointerException: Cannot invoke "org.apache.rocketmq.store.MessageStore.getMessageStoreConfig()" because the return value of "org.apache.rocketmq.broker.BrokerController.getMessageStore()" is null

说是获取文件目录为空,这是因为我们设置的 store 目录并没有写的权限,需要设置权限

sudo chmod -R 777 /var/opt/rocket-mq/store
  1. java.lang.NullPointerException: Cannot invoke "org.apache.rocketmq.store.timer.TimerMessageStore$TimerFlushService.shutdown()" because "this.timerFlushService" is null

在配置文件中,定义了 abort 和 checkpoint 两个文件夹,其实这两个不需要创建文件夹,它们是文件

  1. java.lang.RuntimeException: Lock failed,MQ already started

这是因为每一个broker 都应该有自己的仓库。重新设定一个

启动 -C 是指定配置文件

服务器1 broker master1

mqbroker -c /var/opt/rocket-mq/conf/2m-2s-sync/broker-a.properties &

服务器1 broker slave2

mqbroker -c /var/opt/rocket-mq/conf/2m-2s-sync/broker-b-s.properties &

image.png

在 192.168.80.138 上启动master1和slave2

mqbroker -c /var/opt/rocket-mq/conf/2m-2s-sync/broker-a-s.properties &
mqbroker -c /var/opt/rocket-mq/conf/2m-2s-sync/broker-b.properties &