掘金 后端 ( ) • 2024-05-10 09:34

关注微信公众号 “程序员小胖” 每日技术干货,第一时间送达!

引言

在当今云计算和微服务架构大行其道的时代,分布式系统成为了构建高可用、高性能应用的基石。然而,随着系统规模的扩张,数据的一致性问题如同幽灵般萦绕在每位架构师心头,尤其是分布式事务处理中的挑战更是首当其冲。今天,让我们一起深入探索分布式事务模型中的“最终一致性”,揭开它那既神秘又强大的面纱。

分布式事务的挑战与背景

想象一下双十一购物节,数百万用户同时下单,订单系统、库存系统、支付系统等多个服务间需要协同完成交易。采用最终一致性模型,即使瞬间请求激增导致部分操作延迟,系统也能确保在合理的时间框架内调整库存、确认订单状态,从而维持整体业务流程的顺畅。

最终一致性?

在分布式系统中,最终一致性是一种事务模型,它保证系统中的所有数据副本最终会达到一致的状态,但不保证立即的一致性。这种模型允许在数据复制过程中存在短暂的不一致状态,但随着时间的推移,系统会通过各种机制确保数据最终达到一致。

实现策略

补偿事务(TCC)

TCC,即Try-Confirm-Cancel,是一种通过预先定义的确认和取消操作来保证事务最终一致性的模式。

**Try 阶段:**调用 Try 接口,尝试执行业务,完成所有业务检查,预留业务资源。 Confirm 或 Cancel 阶段:两者是互斥的,只能进入其中一个,并且都满足幂等性,允许失败重试。

**Confirm 操作:**对业务系统做确认提交,确认执行业务操作,不做其他业务检查,只使用 Try 阶段预留的业务资 源。

**Cancel 操作:**在业务执行错误,需要回滚的状态下执行业务取消,释放预留资源。

转账场景示例:


//Account类代表一个账户,拥有冻结、解冻、存款和取款的方法。Money类代表金额。
public class AccountService {

    // Try阶段:检查账户余额并冻结资金
    public boolean prepareTransfer(Account source, Account target, Money amount) {
        if (source.getBalance() < amount) {
            return false; // 余额不足
        }
        source.freeze(amount); // 冻结资金
        return true;
    }

    // Confirm阶段:实际转账操作
    public void confirmTransfer(Account source, Account target, Money amount) {
        source.withdraw(amount); // 从源账户扣除金额
        target.deposit(amount); // 向目标账户增加金额
    }

    // Cancel阶段:回滚操作,解冻资金
    public void cancelTransfer(Account source, Account target, Money amount) {
        source.unfreeze(amount); // 解冻资金
    }
}

注意事项

**幂等性:**确保Try、Confirm和Cancel操作都是幂等的,以支持重复执行而不会引起副作用。

**空回滚:**系统应能够处理“空回滚”的情况,即Cancel操作被调用,但Try操作并未实际执行。

**防悬挂:**确保系统能够处理悬挂操作,即Try操作在网络延迟后到达,而Cancel操作已经执行。

本地消息表

本地消息表的方案最初是由 ebay 的工程师提出,核心思想是将分布式事务拆分成本地事务进行处理,通过消息日志的方式来异步执行。本地消息表是一种业务耦合的设计,消息生产方需要额外建一个事务消息表,并记录消息发送状态,消息消费方需要处理这个消息,并完成自己的业务逻辑,另外会有一个异步机制来定期扫描未完成的消息,确保最终一致性。

实战代码示例:

  1. 系统收到下单请求,将订单业务数据存入到订单库中,并且同时存储该订单对应的消息数据,比如购买商品的 ID 和数量,消息数据与订单库为同一库,更新订单和存储消息为一个本地事务,要么都成功,要么都失败。
   @Service
    public class OrderService {

        @Resource
        private OrderMapper orderMapper;
        
        @Resource
        private OrderMessageMapper orderMessageMapper;

        @Autowired
        private MessageProducer messageProducer; // 消息队列的发送器

        @Transactional
        public void placeOrder(Order order, OrderMessage orderMessage) {
            // 将订单业务数据存入到订单库中
            int orderRows = orderMapper.insert(order);

            // 同时存储该订单对应的消息数据
            int messageRows = orderMessageMapper.insert(orderMessage);

            // 确保订单和消息数据都成功插入
            if (orderRows == 1 && messageRows == 1) {
                // 发送库存更新消息到消息队列
                messageProducer.sendMessage(orderMessage);
            } else {
                // 如果任何插入失败,抛出异常以回滚事务
                throw new RuntimeException("Order or message data insertion failed");
            }
        }
    }
  1. 库存服务更新
    @Autowired
    private InventoryDomainService inventoryDomainService;

    @Override
    public boolean consume(MqMessageEntity<OrderMessage> mqMessageEntity) {
        log.info("接收订单支付完成请求,扣件库存:{}", JSON.toJSONString(mqMessageEntity));
        return inventoryDomainService.deductionInventory(mqMessageEntity);
    }
  1. 订单服务更新本地消息表
        @Autowired
        private OrderMessageMapper orderMessageMapper;

        public void sendMessage(OrderMessage orderMessage) {
            // 向消息队列发送库存更新消息
            // 消息发送成功的回调中更新本地消息表状态
            orderMessageMapper.upodateMessageStatus(orderMessage);
        }
  1. 异步任务重试机制 使用Spring的@Scheduled注解来定时触发异步任务。这个地方用任何调度计划都可以实现 我用的是spring自带的@Scheduled注解实现的。异步技术也可以根据自己的情况选择。

@Component
public class MessageRetryScheduler {

    @Autowired
    private MessageProducer messageProducer;

    @Scheduled(fixedRate = 60000) // 每60秒执行一次
    public void scheduleMessageRetry() {
        messageProducer.scanAndRetryUnsentMessages();
    }
}

    @Autowired
    private OrderMessageMapper orderMessageMapper;
    
    @Async
    public void scanAndRetryUnsentMessages() {

        List<OrderMessageDO> unsentMessages = orderMessageMapper.queryByStatus("PENDING");
        
        for (OrderMessage message : unsentMessages) {
            try {
                sendMessage(message); // 重试发送消息
                orderMessageMapper.updateStatus(message);
            } catch (Exception e) {
                // 可以选择更新状态为错误或其他逻辑
                orderMessageMapper.updateStatus(message);
            }
        }
    }

RocketMQ 事务消息

RocketMQ 事务消息是一种支持分布式事务的消息。它通过引入 prepare、commit 和 rollback 三个阶段,来确保事务消息的一致性。

**prepare 阶段:**消息发送方发送半消息,此时消息的状态为“待提交”。

**commit 阶段:**消息发送方向 RocketMQ 发送 commit 消息请求,RocketMQ 判断此时半消息是否被确认,如果半消息已被确认,则将消息标记为“可消费”并提交事务。如果半消息未被确认,则将消息标记为“不可消费”并终止事务。

**rollback 阶段:**消息发送方向 RocketMQ 发送 rollback 消息请求,RocketMQ 将半消息标记为“不可消费”并回滚 事务。

代码示例:

创建并初始化一个事务消息生产者:

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,并指定NameServer地址
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("localhost:9876");

        // 指定事务监听器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地事务逻辑,例如数据库操作
                // 假设执行成功,返回Commit状态
                return LocalTransactionState.CommitMessage;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(Message msg) {
                // 检查本地事务状态,确认是否需要提交或回滚
                // 这里可以根据业务逻辑来实现检查
                // 假设检查通过,返回Unknown状态,让消息服务决定是提交还是回滚
                return LocalTransactionState.Unknown;
            }
        });

        // 启动生产者
        producer.start();

        // 创建消息
        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());

        // 发送事务消息
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);

        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }
}

在代码示例中我们实现了TransactionListener接口的两个方法:

**executeLocalTransaction:**执行本地事务逻辑,返回事务状态。如果本地事务执行成功,返回CommitMessage;如果执行失败,返回RollbackMessage。

**checkLocalTransaction:**检查本地事务状态。如果事务状态未知,返回Unknown,让消息队列服务决定是提交还是回滚消息。

最大努力通知

最大努力通知型( Best-effort delivery)是最简单的一种柔性事务,适用于一些最终一致性时间敏感度低 的业务,且被动方处理结果 不影响主动方的处理结果。典型的使用场景:如银行通知、商户通知等。 最大努力通知型的实现方案,一般符合以下特点:

  1. 不可靠消息:业务活动主动方,在完成业务处理之后,向业务活动的被动方发送消息,直到通知N次后不再通知,允许消息丢失(不可靠消息)。
  2. 定期校对:业务活动的被动方,根据定时策略,向业务活动主动方查询(主动方提供查询接口),恢复丢失的业务消息

代码示例:

发送通知

@Service
public class NotificationService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendNotification(String message) {
        // 发送通知消息到MQ
        rabbitTemplate.convertAndSend("notificationExchange", "notificationRoutingKey", message);
    }
}

监听消息并处理

@Component
public class NotificationListener {

    @RabbitListener(queues = "notificationQueue")
    public void handleNotification(String message) {
        // 处理消息,例如更新库存
        processNotification(message);

        // 确认消息已处理
        acknowledgeMessage();
    }

    private void processNotification(String message) {
        // 业务逻辑处理
    }

    private void acknowledgeMessage() {
        // 向发送方确认消息已处理的逻辑
    }
}

重试机制通常由消息中间件提供,如RabbitMQ的死信队列和重试策略。校对机制可能需要额外的接口和逻辑来实现。

结语

最终一致性作为分布式系统中一种重要的事务处理哲学,它在实践中展现出了强大的生命力。然而,没有银弹存在,每种模型都有其适用场景与局限。作为技术探索者,我们应当持续思考如何更精细地控制一致性级别,结合业务特性量体裁衣,设计出既能满足业务需求又能保持系统弹性的解决方案。那么,您在实际项目中遇到过哪些分布式事务的挑战?对于最终一致性模型又有何独到见解或疑问呢?欢迎留言讨论,共同推进技术的边界。