掘金 后端 ( ) • 2024-04-28 13:46

背景是这样的:在一个名为"幸运大转盘"的线上活动中,用户可以通过消耗一定的积分来参与抽奖,有机会赢取各种奖品。这个活动的后台系统使用了消息队列(MQ)来处理用户的抽奖请求和发送中奖消息。

一天,有一个名叫小明的用户在活动中抽中了一台新的iPhone。然而,就在他抽中奖品的同时,后台系统的业务代码出现了一个严重的错误,导致抽奖请求的处理过程中出现了问题,最终导致了一个回滚操作。

虽然回滚操作成功地撤销了小明的抽奖请求,但是由于消息队列的特性,中奖消息已经被发送出去,小明收到了一条中奖消息。最后来兑奖的时候产生了这样的一个闹剧。

这样的业务其实在我们开发过程中可以说很平常了,当在一个方法中即有业务代码和发送mq消息的时候,就会出现这样的情况,那么,我们如何避免这样的情况出现呢,这个时候就用到了RocketMQ里面的事务消息了。

我们先来看一张官方关于事务消息的流程图

在这里插入图片描述

事务消息发送步骤如下:

  1. 生产者将半事务消息发送至 RocketMQ Broker^1
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息^2
  3. 生产者开始执行本地事务逻辑^3
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下^4
  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查^5
  2. note 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置^6 事务消息回查步骤如下:
  3. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果^7
  4. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理^8

其实博主之前已经写过一篇RocketMq的消息基本使用文章,但是里面没有讲详细,所以这篇文章是更详细的MQ事务消息讲解:

【Spring Cloud Alibaba】RocketMQ的基础使用,如何发送消息和消费消息

使用rocketmq-client-java

我们先来一个不使用rocketmq-spring-boot-starter,使用原生的rocketmq-client-javaSDK来讲解一下mq的事务消息。

		 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.0</version>
        </dependency>

我们先来看代码:

package com.masiyi.provider.service;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.setNamesrvAddr("120.76.201.118:9876");
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

    static class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

创建一个RocketMQ事务消息生产者,发送10条带有不同标签的事务消息,它使用自定义的TransactionListenerImpl来处理和检查本地事务状态。可以看到他使用for循环一共发送了10条消息,那么我们运行一下看结果:

在这里插入图片描述

可以看到我们的控制台最终只有三条信息发送成功了,我们可以看到结果是其他的7条消息都没有发送成功,也就是进行了回滚操作。

我们来看一下我们的代码

main方法

  1. TransactionListener transactionListener = new TransactionListenerImpl(); - 创建一个事务监听器的实例。这个监听器需要实现TransactionListener接口,以处理事务消息的三种状态:提交、回滚和未知。

  2. TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); - 创建一个事务消息的生产者。"please_rename_unique_group_name"是生产者的组名,应该是唯一的。

  3. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {...}); - 创建一个线程池,用于处理事务消息的检查。这个线程池的大小是2-5,任务队列的大小是2000。

  4. producer.setExecutorService(executorService); - 设置生产者的线程池。

  5. producer.setTransactionListener(transactionListener); - 设置生产者的事务监听器。

  6. producer.setNamesrvAddr("120.76.201.118:9876"); - 设置生产者的NameServer地址。

  7. producer.start(); - 启动生产者。

  8. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; - 创建一个标签数组。

  9. for (int i = 0; i < 10; i++) {...} - 发送10条事务消息。每条消息都有一个不同的标签,以便于在消费者端进行过滤。

  10. for (int i = 0; i < 100000; i++) {...} - 休眠100000秒,以便于在这段时间内处理事务消息的状态。

  11. producer.shutdown(); - 关闭生产者。

TransactionListenerImpl类

TransactionListener接口是Apache RocketMQ中用于处理事务消息的接口,它有两个方法:executeLocalTransactioncheckLocalTransaction

  1. private AtomicInteger transactionIndex = new AtomicInteger(0); - 创建一个原子整数,用于生成事务索引。

  2. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); - 创建一个线程安全的哈希表,用于存储本地事务的状态。

  3. executeLocalTransaction方法 - 这个方法在发送事务消息时被调用,用于执行本地事务。在这个方法中,我们首先获取一个事务索引,然后根据这个索引的值来模拟本地事务的执行结果。最后,我们将事务的状态(0、1或2)存储在localTrans哈希表中,并返回LocalTransactionState.UNKNOW,表示本地事务的状态未知。

  4. checkLocalTransaction方法 - 这个方法在检查本地事务的状态时被调用。在这个方法中,我们首先从localTrans哈希表中获取事务的状态,然后根据这个状态来返回相应的LocalTransactionState。如果事务的状态未知,我们返回LocalTransactionState.UNKNOW;如果事务已经提交,我们返回LocalTransactionState.COMMIT_MESSAGE;如果事务已经回滚,我们返回LocalTransactionState.ROLLBACK_MESSAGE

我们再回到之前的图中,在这个main方法里面我们只做了1^12^23^3个步骤也就是到了producer.sendMessageInTransaction(msg, null);的时候,我们才会调用执行本地事务,也就是这段代码:

  @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

而到了这个步骤,就是步骤4^4了, 可以看方法的名字就知道这是我们的本地事务方法:executeLocalTransaction,我们可以把本地事务写到这里,例如在我们的案例中,我们可以把保存中奖信息的操作放入到这个方法里面,通过手动控制事务的方式控制业务代码的提交和返回消息的结果

	COMMIT_MESSAGE,
    ROLLBACK_MESSAGE,
    UNKNOW;

如果本地返回的结果(二次确认结果)为 Commit:服务端将半事务消息标记为可投递,并投递给消费者。 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。 如果是上面两个状态,则不会执行后面的步骤了。但是我们可以看到我们的代码里面返回本地事务的状态都是UNKNOW,所以说后面还会触发5^5和6^6这两个步骤,即示例里面的代码:

  @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }

可以看到这里面根据status的值去判断是否是commit或者rollback。结合我们的业务来看,这个里面就可以通过查询数据库看有没有这条小明的信息或者看他有没有中奖来返回对于的commit或者rollback即步骤7^7,最后执行步骤8^8

经过这个一系列的操作,我们就可以把消息和业务绑定起来,即业务回滚了消息也不回正常发出去,这样就不会出现我们的标题所示的情况了。

使用rocketmq-spring-boot-starter

那么如果我们使用rocketmq-spring-boot-starter又该如何配置呢?那么就请移步RocketMQ的基础使用,如何发送消息和消费消息:发送事务消息 里面的功能和我们上面原生的操作方法是一模一样的,只不过里面加了一个策略模式去区分不同的主题topic。

那么本文仓库已经开源,地址就是:

RocketMq的使用