掘金 后端 ( ) • 2024-04-26 16:44

theme: lilsnake highlight: a11y-dark

RocketMQ可视化管理页面搭建(rocketmq-dashboard)

源码安装

https://github.com/apache/rocketmq-dashboard

拉取项目后打开

在yml 文件中,配置集群管理

rocketmq:
  config:
    # 如果该值为空,则使用 env 值 rocketmq.config.namesrvAddr NAMESRV_ADDR |现在,默认 localhost:9876
    # 配置多个 NameRV 地址以管理多个不同的集群
    namesrvAddrs:
      - 192.168.80.137:9876
      - 192.168.80.138:9876

启动就可以了,默认 8080 端口

消息发送样例

依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.6</version>
</dependency>

消息发送者步骤分析

  1. 创建客户端服务提供者
  2. 构建配置类,指定 proxy 代理地址
  3. 获取生产者 producer,绑定主题
  4. 创建消息对象,指定主题Topic、Tag 和消息体
  5. 发送消息
  6. 关闭生产者producer

消息消费者步骤分析

  1. 创建消费者Consumer,制定消费者组名
  2. 指定Nameserver地址
  3. 订阅主题Topic和Tag
  4. 设置回调函数,处理消息
  5. 启动消费者consumer

同步发送消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

// 设置要发送消息的 Topic
String topic = "yourNormalTopic";

// 获取服务提供者实例
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// 配置 RocketMQ 的通信地址信息
ClientConfiguration configuration = ClientConfiguration.newBuilder()
        .setEndpoints("192.168.80.138:28080")  // 设置 RocketMQ 的服务端地址和端口
        .build();

// 创建 Producer 实例,设置通信配置和预绑定的 Topic
final Producer producer = provider.newProducerBuilder()
        .setTopics(topic)  // 设置要发送消息的 Topic
        .setClientConfiguration(configuration)  // 设置通信配置
        .build();

// 创建要发送的消息
Message message = provider.newMessageBuilder()
        .setTopic(topic)  // 设置消息的 Topic,必须与 Producer 的 Topic 一致
        .setKeys("随便设置的")  // 设置消息的 Key,用于唯一标识消息,可检索
        .setTag("标签")  // 设置消息的 Tag,用于消息过滤
        .setBody("这是数据".getBytes())  // 设置消息的内容,以字节数组形式
        .build();

// 发送消息,并获取发送结果
SendReceipt sendReceipt = producer.send(message);
// 打印发送成功的消息 ID
sendReceipt.getMessageId();  

producer.close();  // 关闭 Producer 实例,释放资源

除了能获取ID信息等,还可以获取

// 发送消息,并获取发送结果
SendReceiptImpl sendReceipt = (SendReceiptImpl) producer.send(message);
// 获取消息发送结果中的相关信息
// 获取消息发送的目标 Broker 的地址信息 ipv4:192.168.80.138:28081
Endpoints endpoints = sendReceipt.getEndpoints();
// 获取消息的事务 ID,用于跟踪消息的事务状态 01FA5177190D9717580638BAF400000000
String transactionId = sendReceipt.getTransactionId();
// 获取消息发送到的消息队列信息
MessageQueueImpl messageQueue = sendReceipt.getMessageQueue();
// 获取消息在消息队列中的偏移量(Offset)
long offset = sendReceipt.getOffset();

异步信息发送

步骤:创建异步 fluter,创建线程池,回调函数接受结果

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

// 消息
String topic = "yourNormalTopic";
// 获取服务提供者
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 配置信息,地址等
ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints("192.168.80.138:28080").build();
// 获取生产者--初始化Producer时需要设置通信配置以及预绑定的Topic
final Producer producer = provider.newProducerBuilder()
        .setTopics(topic)
        .setClientConfiguration(configuration)
        .build();
// 创建普通信息
Message message = provider.newMessageBuilder()
        .setTopic(topic)
        .setKeys("123123123")
        .setTag("测试异步")
        .setBody("为什么我的心是甜的".getBytes())
        .build();


// 使用 CompletableFuture 异步发送消息
CompletableFuture<SendReceipt> sendAsync = producer.sendAsync(message);
// 创建用于执行异步回调的线程池
ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
// 注册异步回调函数,处理发送结果或异常
sendAsync.whenCompleteAsync(new BiConsumer<SendReceipt, Throwable>() {
    @Override
    public void accept(SendReceipt sendReceipt, Throwable throwable) {
        if (null != throwable) {
            // 发送过程中出现异常
            System.out.println("发送消息出错:" + throwable.getMessage());
        }
        // 发送成功,处理发送结果
        System.out.println("异步发送成功,消息ID:" + sendReceipt.getMessageId());
    }
}, sendCallbackExecutor);

消费信息

MessageViewImpl 是消息对象

final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints("192.168.80.138:28080").build();

// 创建过滤器,用于只消费满足条件的消息,这里过滤 tag,并且内容是 标签
FilterExpression filterExpression = new FilterExpression("标签", FilterExpressionType.TAG);

// 在大多数情况下,不需要创建太多的消费者,建议使用singleton模式
PushConsumer consumer = provider.newPushConsumerBuilder()
        // 指定配置文件
        .setClientConfiguration(configuration)
        // 设置消费者组名称
        .setConsumerGroup("group1")
        // 设置消费者订阅的主题,以及过滤的表达式
        .setSubscriptionExpressions(Collections.singletonMap("yourNormalTopic", filterExpression))
        // 监听消费状态
        .setMessageListener(new MessageListener() {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                // 强转
                MessageViewImpl messageView1 = (MessageViewImpl) messageView;
                System.out.println(messageView1);
                // 处理收到的消息并返回消费结果。
                System.out.println(1111);
                // 返回成功
                return ConsumeResult.SUCCESS;
            }
        })
        .build();
System.out.println(1111);
// 这里必须阻塞,否则还没有消费就关闭了
Thread.sleep(Long.MAX_VALUE);
consumer.close();

MessageViewImpl 有哪些信息呢?

  1. getBody 返回 ByteBuf对象

转换成字符串

StandardCharsets.UTF_8.decode(messageView1.getBody()).toString()
  1. getMessageId

获取消息ID

  1. getTopic

获取主题

  1. getTag

获取标签

  1. getMessageGroup

获取消息组

  1. getDeliveryTimestamp

延迟执行时间

广播模式

含义:同一个主题的信息可以被多个消费者分别消费。

image.png

只需要启动两个消费者组,分别监听同一个主题的消息,进行消费即可

消费者负载均衡---消息粒度负载均衡

对于 PushConsumerSimpleConsumer 类型的消费者,默认且仅使用消息粒度负载均衡策略。

同一个消费者组的消费者将平均处理同一个队列中的信息

image.png

消费者获取某条消息之后,服务端会将该消息加锁,保证其他消费者对该信息不可见,直到该消息成功被消费或者超时

重点:

  1. 如果是多个群组同时处理一个主题信息的话,是会接收到的
  2. 如果是单组的话,只会有一个消费者接收

只需要启动两个消费者组,组名一定要一致,分别监听同一个主题的消息,进行消费即可

但是我想让数据顺序接受怎么办?

因为负载均衡策略,默认是随机的。我们使用的是同步发送消息或者是异步消息发送,下面我们讲到顺序发送,这样消费的时候同样也是顺序消费了。

顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序

原理解析:

在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

注意事项:多个消息组向一个队列发送消息,是不能保证顺序的一致性的。

做之前呢,我们要创建一个 顺序类型的 Topic,这个我之前在可视化管理页面的里面没有找到创建,所以只能用命令来做处理了。

-c 是之前集群,-t 指定主题名,-o 顺序类型 -n 是地 -a 是更新内容,更新 message的type类型为 FIFO

mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 127.0.0.1:9876 -a +message.type=FIFO

测试代码

生产者

ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints("192.168.80.138:28080")
        .build();
// 主题类型必须是顺序类型
String topic = "FIFOTopic";
Producer producer = provider.newProducerBuilder().setClientConfiguration(configuration).setTopics(topic).build();

for (int i = 0; i < 30; i++) {
    Message message = provider.newMessageBuilder().setTopic(topic).setTag("广播").setKeys("22222222")
            .setBody(("hello--" + i).getBytes())
            // 设置群组,必须设置群组。
            .setMessageGroup("group3")
            .build();
    producer.send(message);
}
producer.close();

消费者

和之前一样,即可。

延迟消息

顾名思义:就是延迟一定的时间发送消息

上面顺序中的顺序主题是定义的,那么延迟也是一样的啦!

定义延迟主题

mqadmin updateTopic -c rocketmq-cluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY

生成者

通过 .setDeliveryTimestamp(deliveryTime) 设置延迟时间

ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints("192.168.80.138:28080")
        .build();

// 主题类型必须是延迟类型
String topic = "DelayTopic";
Producer producer = provider.newProducerBuilder().setClientConfiguration(configuration).setTopics(topic).build();

for (int i = 0; i < 10; i++) {
    // 1 分钟
    long deliveryTime = System.currentTimeMillis() + 1 * 60 * 1000;
    Message message = provider.newMessageBuilder().setTopic(topic).setTag("延迟").setKeys("22222222")
            .setBody(("hello--" + i).getBytes())
            // 设置延迟时间
            .setDeliveryTimestamp(deliveryTime)
            .build();
    producer.send(message);
}
producer.close();

关于设置时间这里还有一种方式

// 设置时间,用当前时间戳 + 上即可。---设置十秒
Duration duration = Duration.ofSeconds(10);
long deliveryTime = System.currentTimeMillis() + duration.toMillis();

消费者一样

过滤消息

消费者进行消费时,可以过滤出自己想要的消息,比如根据 tag 过滤和 sql 过滤

tag 过滤

通过 FilterExpression 类设置过滤条件和类型,标签情况下,可以用 || 表示消费多个标签,接受所有用 *

final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints("192.168.80.138:28080").build();

// 设置 tag1,类型为标签
FilterExpression filterExpression = new FilterExpression("tag1 || tag2", FilterExpressionType.TAG);

PushConsumer consumer = provider.newPushConsumerBuilder()
        .setClientConfiguration(configuration)
        // 设置主题和过滤类
        .setSubscriptionExpressions(Collections.singletonMap("TopicTest2", filterExpression))
        .setConsumerGroup("group3")
        .setMessageListener(messageView -> {
            MessageViewImpl messageView1 = (MessageViewImpl) messageView;
            System.out.println(StandardCharsets.UTF_8.decode(messageView1.getBody()));
            return ConsumeResult.SUCCESS;
        })
        .build();
Thread.sleep(Long.MAX_VALUE);
consumer.close();

生产者只需要设置标签即可。

Message message = provider.newMessageBuilder().setTopic(topic)
    // 设置标签
    .setTag("tag2")
    .setKeys("22222222").setBody(("hello").getBytes()).build();

sql 过滤

语法 说明 示例 IS NULL 判断属性不存在 a IS NULL :属性a不存在 IS NOT NULL 判断属性存在 a IS NOT NULL:属性a存在 > >= < <= 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错 说明 可转化为数字的字符串也被认为是数字 a IS NOT NULL AND a > 100:属性a存在且属性a的值大于100 BETWEEN xxx AND xxx 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx,表示属性值在两个数字之间 a IS NOT NULL AND (a BETWEEN 10 AND 100):属性a存在且属性a的值大于等于10且小于等于100 NOT BETWEEN xxx AND xxx 用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外 a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):属性a存在且属性a的值小于10或大于100 IN (xxx, xxx) 表示属性的值在某个集合内。集合的元素只能是字符串 a IS NOT NULL AND (a IN ('abc', 'def')):属性a存在且属性a的值为abc或def = <> 等于和不等于。可用于比较数字和字符串 a IS NOT NULL AND (a = 'abc' OR a<>'def'):属性a存在且属性a的值为abc或a的值不为def AND OR 逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内 a IS NOT NULL AND (a > 100) OR (b IS NULL):属性a存在且属性a的值大于100或属性b不存在。

生产者

通过 addProperty 方法添加自定义属性

ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints("192.168.80.138:28080")
        .build();

String topic = "TopicTest2";
Producer producer = provider.newProducerBuilder().setClientConfiguration(configuration).setTopics(topic).build();

for (int i = 0; i < 10; i++) {
    Message message = provider.newMessageBuilder().setTopic(topic).setTag("tag2")
            .setKeys("22222222").setBody(("hello--" + i).getBytes())
            // 添加自定义属性用于 sql 过滤
            .addProperty("i", String.valueOf(i))
            .build();
    producer.send(message);
}
producer.close();

消费者

FilterExpression 设置类型为 sql 过滤,并且条件是自定义属性 > 5

final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints("192.168.80.138:28080").build();

// 设置 sql 过滤,消费自定义属性 > 5
FilterExpression filterExpression = new FilterExpression("i > 5", FilterExpressionType.SQL92);

PushConsumer consumer = provider.newPushConsumerBuilder()
        .setClientConfiguration(configuration)
        // 设置主题和过滤类
        .setSubscriptionExpressions(Collections.singletonMap("TopicTest2", filterExpression))
        .setConsumerGroup("group3")
        .setMessageListener(messageView -> {
            MessageViewImpl messageView1 = (MessageViewImpl) messageView;
            System.out.println(StandardCharsets.UTF_8.decode(messageView1.getBody()));
            return ConsumeResult.SUCCESS;
        })
        .build();
Thread.sleep(Long.MAX_VALUE);
consumer.close();

事务消息

事务消息.png

事务:消息发送到 mq 之后,这个时候消息是不可以被消费者知道的,需要 commit 提交之后,消费者才能去消费消息。

  • 半消息:消息发送到 mq,但是没有提交
  • commit:将半消息提交
  • rollback:将消息回滚,重新处理

我们在提交消息或者回滚消息的时候,没有对半消息进行处理,这时候,mq 会检查消息的状态,触发本地的检查方法,我们本地的检查方法,再对消息进行提交或者回滚。

创建事务主题

mqadmin updatetopic -n localhost:9876 -t TestTopic -c rocketmq-cluster -a +message.type=TRANSACTION &

生产者

设置事务检查--用于处理没有结果的数据,比如发送了,但是未提交或者未回滚事务。

ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration = ClientConfiguration.newBuilder().setEndpoints("192.168.80.138:28080")
        .build();

String topic = "TestTopic";

// 设置事务检查器
Producer producer = provider.newProducerBuilder().setClientConfiguration(configuration).setTopics(topic)
        // 设置事务检查器
        .setTransactionChecker(new TransactionChecker() {
            @Override
            public TransactionResolution check(MessageView messageView) {
                System.out.println("事务检查---" + messageView.getTag());
                // TransactionResolution.UNKNOWN 每隔一段时间检查一次,最高 16次。
                return TransactionResolution.COMMIT;
            }
        }).build();

// 开启事务
Transaction transaction = producer.beginTransaction();

Message message = provider.newMessageBuilder().setTopic(topic).setTag("tag2")
        .setKeys("22222222").setBody(("hello").getBytes()).build();
// 携带事务发送
SendReceiptImpl sendReceipt = (SendReceiptImpl) producer.send(message, transaction);

// 模拟延迟提交,等待一段时间,使其触发事务检查
Thread.sleep(150000);

// 假设失败了,执行回滚
//        transaction.commit();
// 或者回滚
//         transaction.rollback();
System.out.println("生产者启动!");
//        producer.close();