分布式事务原理与实践

概述

分布式事务是分布式架构中的核心挑战之一,尤其在跨服务、跨数据库操作时保证数据一致性。在传统的单体应用(Monolithic App)中不同的模块,在同一个数据源上更新数据来完成一项业务,整个过程的数据一致性可以由数据库的本地事务来保证。随着业务需求和架构的变化,单体应用进行了服务化拆分,原来的多个模块被拆分为多个独立的服务,每个服务使用独立的数据源(Pattern: Database per service)。整个业务过程将由多个服务的调用来完成。此时,每个服务自身的数据一致性仍有本地事务来保证,但是整个业务层面的全局数据一致性要如何保障呢?这就是分布式系统所面临的典型分布式事务需求:分布式系统需要一个解决方案来保障对所有节点操作的数据一致性,这些操作组成一个分布式事务,要么全部执行,要么全部不执行。

CAP理论:
CAP 定理(Consistency、Availability、Partition Tolerance Theorem),也称为 Brewer 定理,起源于在 2000 年 7 月,是加州大学伯克利分校的 Eric Brewer 教授于ACM 分布式计算原理研讨会(PODC) 上提出的一个猜想。

  • 一致性(Consistency) :指的是客户端的每次读操作,不管访问哪个节点,要么读到的都是同一份最新数据,要么读取失败。
  • 可用性(Availability) :指的是客户端的请求,不管访问哪个节点,都能得到响应数据,但不保证是同一份最新数据。即我尽力给你返回数据,不会不响应你,但是我不保证每个节点给你的数据都是最新的,这个指标强调的是服务可用,但不保证数据的一致。
  • 分区容错性(Partition tolerance) :指的是当节点间出现消息丢失、高延迟或者已经发生网络分区时,系统仍然可以继续提供服务。也就是说,分布式系统在告诉访问本系统的客户端: 不管我的内部出现什么样的数据同步问题,我会一直运行,提供服务 。

在CAP理论中,分布式系统不可能同时满足以下三种,最多只能同时满足其中的两项,这是因为在分布式环境中网络分区是必然存在的,对于一个分布式系统而言,一致性(Consistency)、可用性(Availability)、分区容错性(Partition Tolerance)3 个指标不可兼得,只能在 3 个指标中选择 2 个 :

  • CP (一致性与分区容错): 放弃可用性。当发生网络分区时,为了保证数据一致性,系统会停止服务,等待分区恢复或数据同步完成。例如,ZooKeeper、etcd。
  • AP (可用性与分区容错): 放弃强一致性。当发生网络分区时,系统会继续提供服务,但可能返回不一致的数据。当分区恢复后,系统会最终同步数据达到一致。例如,一些 NoSQL 数据库(如 Cassandra、DynamoDB)、大部分注册中心在设计上偏向 AP。
  • CA(一致性与可用性): 放弃分区容错,在没有网络分区时表现良好,但无法处理分区故障。例如:单机数据库、传统RDBMS集群

事实上,在不存在网络分区的情况下也就是分布式系统正常运行时,C 和 A 能够同时保证。只有当发生分区故障的时候,也就是说需要 P 时,才会在 C 和 A 之间做出选择。

BASE理论
在CAP理论中,我提到分布式系统理论上只能取 CP 或 AP,如果要实现强一致性必然会影响可用性。但是,大多数系统实际上不需要那么强的一致性,而是更关注可用性,所以生产环境,大多数系统都会采用可用性优先的 AP 模型。Base 理论是 CAP 理论中的 AP 的延伸,是对互联网大规模分布式系统的实践总结,强调可用性。BASE理论 是 基本可用(Basically Available) 、 软状态(Soft-state) 和 最终一致(Eventually Consistent) 三个短语的缩写:

  • Basically Available(基本可用):当分布式系统在出现不可预知的故障时,允许损失部分功能的可用性,保障核心功能的可用性。基本可用在本质上是一种妥协,也就是在出现节点故障或系统过载的时候,通过牺牲非核心功能的可用性,保障核心功能的稳定运行 。
  • Soft state(软状态):描述的是实现服务可用性的时候系统数据的一种过渡状态,也就是说不同节点间,数据副本存在短暂的不一致。比如,分布式存储中一般一份数据至少会有N个副本,允许系统在不同节点的数据副本之间进行数据同步的过程中存在延时。
  • Eventually consistent(最终一致性):分布式系统即使无法做到强一致性,但应当根据自身业务特点,采用适当的方式在一定时限后使各个节点的数据最终能够达到一致的状态。这个时限取决于网络延时,系统负载,数据复制方案设计等等因素。几乎所有的互联网系统采用的都是最终一致性,只有在实在无法使用最终一致性,才使用强一致性或事务。一般来说,在实际工程实践中有这样几种方式:
  • 读时修复: 在读取数据时,检测数据的不一致,进行修复。
  • 写时修复: 在写入数据时,检测数据的不一致,进行修复。
  • 异步修复: 这个是最常用的方式,通过定时对账检测副本数据的一致性,并修复。

因为写时修复不需要做数据一致性对比,性能消耗比较低,对系统运行影响也不大,所以许多开源框架都是用这种方式实现最终一致性的。而读时修复和异步修复因为需要做数据的一致性对比,性能消耗比较多,所以需要尽量优化一致性对比的算法,降低性能消耗,避免对系统运行造成影响。

小结
BASE 理论是对 CAP 中一致性和可用性权衡的结果,它来源于对大规模互联网分布式系统实践的总结,是基于 CAP 定理逐步演化而来的。它的核心思想是: 如果不是必须的话,不推荐实现事务或强一致性,鼓励可用性和性能优先,根据业务的特点,来实现非常弹性的基本可用,以及数据的最终一致性 。

分布式事务实现方案

在CAP理论中CAP不可能同时满足三个条件,我们必须要有取舍,在事务中应该遵循ACID,对数据要求强一致性,那么我们必须选择 CP——强一致性 ,即刚性事务

  • 2PC (两阶段提交)
  • 3PC(三阶段提交)

刚性事务指的是强一致性,基础是XA协议,XA协议是一个基于数据库的分布式事务协议,其分为两部分:事务管理器(Transaction Manager)本地资源管理器(Resource Manager)。事务管理器作为一个全局的调度者,负责对各个本地资源管理器统一号令提交或者回滚。相对于刚性事务还有柔性事务(AP + BASE): 柔性事务追求的是最终一致性:

  • TCC
  • Saga
  • 本地消息表
  • MQ事务方案
  • 最大努力通知

2PC(两阶段提交)

为了解决分布式事务的一致性问题,X/Open组织(后来并入了The Open Group)提出了一套名为X/Open XA(XA 是 eXtended Architecture 的缩写)的处理事务架构,其核心内容是定义了全局的事务管理器(Transaction Manager,用于协调全局事务)和局部的资源管理器(Resource Manager,用于驱动本地事务)之间的通信接口。XA 接口是双向的,能在一个事务管理器和多个资源管理器(Resource Manager)之间形成通信桥梁,通过协调多个数据源的一致动作,实现全局事务的统一提交或者统一回滚。

2PC指的是 Prepare & Commit,2PC 最早是用来实现数据库的分布式事务的,上面提到的 XA 协议是 X/Open 国际联盟基于二阶段提交协议提出的,也叫作 X/Open Distributed Transaction Processing(DTP)模型,比如 MySQL 就是通过 MySQL XA 实现了分布式事务。

  • ApplicationProgram(AP) 应用程序定义了事务边界并指定构成事务的操作。
  • ResourceManager(RM) 资源管理器用来管理需要访问的共享资源,可以理解为关系数据库、文件存储系统、消息队列、打印机等。
  • TransactionManagger(TM) 事务管理器是一个独立的组件,他为事务分配标识符并监视事务的执行情况,负责事务完成和故障恢复。
  • CommunicationResourceManager(CRM) 通信资源管理器控制一个或多个 TM domain 之间分布式应用的通信。

2PC引入一个作为协调者(coordinator)的组件来统一掌控所有参与者(participant)的操作结果,并最终指示这些节点是否要把操作结果进行真正的提交:

  • 协调者节点(coordinator),一般也叫做 事务协调者
  • 参与者节点(participant、cohort),一般也叫做 事务参与者

2PC处理流程:

第一阶段:准备阶段:

  • 协调者向所有参与者发送REQUEST-TO-PREPARE;
  • 当参与者收到REQUEST-TO-PREPARE消息后,它向协调者发送消息PREPARE或者NO,表示事务是否准备好,如果发送是NO,那么事务回滚。

第二阶段:提交阶段

  • 协调者收集所有参与者的返回信息,如果所有参与者都回复PREPARED,那么协调者向所有参与者发送COMMIT消息,否则,协调者发送ABORT消息;
  • 参与者收到协调者发来的Commit消息或Abort消息,它将执行提交或回滚,并向协调者发送DONE消息确认。

两阶段提交的缺点:

  • 网络抖动导致数据不一致:第二阶段协调者向参与者发送commit命令后,如果发生网络抖动,有一部分参与者未收到commit请求,则无法执行事务提交,影响整个系统数据一致性;
  • 超时导致的同步阻塞问题:2PC中所有参与者节点都是事务阻塞型,当一个节点通信超时,其余参与者都会被阻塞;
  • 单点故障的风险:整个过程严重依赖协调者,如果协调者故障,参与者处于锁定资源的状态,无法完成事务commit的操作。即使重新选择一个协调者,也无法解决因前一个协调者宕机导致的阻塞问题;

2PC分布式事务方案,比较适合单体应用跨多库的场景,一般用spring + JTA就可以实现。但是因为严重依赖于数据库层面来搞定复杂的事务,效率很低,所以绝对不适合高并发的场景。虽然是目前分布式事务的事实规范,但实际应用并不多。不过2PC是一种非常经典的思想,Paxos、Raft 等强一致性算法,都采用了二阶段提交操作。


3PC(三阶段提交)

为了缓解两段式提交协议的一部分缺陷,具体地说是协调者的单点问题和准备阶段的性能问题,后续又发展出了三段式提交协议(3PC)。3PC是在2PC的基础上,在第一阶段和第二阶段中插入一个准备阶段,把原本的两段式提交的准备阶段再细分为两个阶段:询问阶段(CanCommit)、准备阶段(PreCommit)以及提交阶段(DoCommit)。一方面新增一个 询问阶段(CanCommit),提前确认下各个参与者的状态是否正常,另一方面引入超时机制,解决资源阻塞问题;

询问阶段: 事务协调者向事务参与者发送 CanCommit 请求,参与者如果可以提交就返回 Yes 响应,否则返回 No 响应。这样的话,询问阶段就可以确保尽早的发现无法执行操作的参与者节点,提升效率。该阶段参与者也不会取锁定资源。

  • 事务协调者发送事务询问指令(canCommit),询问事务参与者是否可以提交事务;
  • 参与者如果可以提交就返回 Yes 响应,否则返回 No 响应,不需要做真正的操作。

对于事务协调者,如果询问阶段有任一参与者返回NO或超时,则协调者向所有参与者发送 abort指令,对于返回NO的参与者,如果在指定时间内无法收到协调者的 abort指令 ,则自动中止事务。

准备阶段: 事务协调者根据事务参与者在询问阶段的响应,判断是执行事务还是中断事务:

  • 如果询问阶段所有参与者都返回YES,则协调者向参与者们发送 预执行指令(preCommit) ,参与者接受到preCommit指令后,写redo和undo日志,执行事务操作,占用资源,但是不会提交事务;
  • 参与者响应事务操作结果,并等待最终指令: 提交(doCommit) 或 中止(abort) 。

提交阶段:

  • 如果每个参与者在准备阶段都返回ACK确认,则协调者向参与者发起 提交指令(doCommit) ,参与者收到指令后提交事务,并释放锁定的资源,最后响应ACK;
  • 如果任意一个参与者在准备阶段返回NO(即执行事务操作失败),或者协调者在指定时间没收到全部的ACK响应,就会发起 中止(abort) 指令,参与者取消已经变更的事务,执行undo日志,释放锁定的资源。

当参与者响应ACK后,即使在指定时间内没收到doCommit指令,也会进行事务的最终提交,一旦进入提交阶段,即使因为网络原因导致参与者无法收到协调者的doCommit或Abort请求,超时时间一过,参与者也会自动完成事务的提交。

优点:

  • 增加了一个询问阶段,询问阶段可以确保尽早的发现无法执行操作的参与者节点,提升效率;
  • 在准备阶段成功以后,协调者和参与者执行的任务中都增加了超时,一旦超时,参与者都会继续提交事务,默认为成功,降低了阻塞范围。

缺点:

  • 如果准备阶段执行事务后,某些参与者反馈执行事务失败,但是由于出现网络分区,导致这些参与者无法收到协调者的中止请求,那么由于超时机制,这些参与者仍会提交事务,导致出现不一致;
  • 性能瓶颈,不适合高并发场景。

三阶段提交协议,虽然针对二阶段提交协议的“协调者故障,参与者长期锁定资源”的痛点,通过引入了询问阶段和超时机制,来减少资源被长时间锁定的情况,但这也会导致集群各节点在正常运行的情况下,使用更多的消息进行协商,增加系统负载和响应延迟。也正是因为这些问题,三阶段提交协议很少被使用。

TCC(Try-Confirm-Cancel)

2007年,Pat Helland 发表了一篇名为《Life beyond Distributed Transactions: an Apostate’s Opinion》的论文,提出了 TCC(Try-Confirm-Cancel) 的概念。TCC的核心思想是: 针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作 ,分为三个阶段:

  • Try: 这个阶段对各个服务的资源做检测以及对资源进行锁定或者预留;
  • Confirm : 执行真正的业务操作,不作任何业务检查,只使用Try阶段预留的业务资源,Confirm操作要求具备幂等设计,Confirm失败后需要进行重试;
  • Cancel: 如果任何一个服务的业务方法执行出错,那么这里就需要进行补偿,即执行回滚操作,释放Try阶段预留的业务资源 ,Cancel操作要求具备幂等设计,Cancel失败后需要进行重试。

从实现上来看,TCC仍然是一个两阶段提交协议。只是在执行出现问题的时候,有一定的自我修复能力,如果任何一个事务参与者出现了问题,协调者可以通过执行逆操作来取消之前的操作,达到最终的一致状态。从TCC的执行流程也可以看出,服务提供方需要提供额外的 补偿逻辑 ,那么原来一个服务接口,引入TCC后可能要改造成3种逻辑:

  • Try:先是服务调用链路依次执行Try逻辑;
  • Confirm:如果都正常的话,TCC分布式事务框架推进执行Confirm逻辑,完成整个事务;
  • Cancel:如果某个服务的Try逻辑有问题,TCC分布式事务框架感知到之后就会推进执行各个服务的Cancel逻辑,撤销之前执行的各种操作。

可靠消息队列事务

可靠事件队列(Reliable Event Queue)是一种基于最终一致性的分布式事务解决方案,通过异步事件驱动的方式,结合消息队列的可靠性机制,确保跨服务的事务最终一致。 这个方式避免了像XA协议那样的性能问题。许多开源的消息中间件都支持分布式事务,比如RocketMQ、Kafka,其思想几乎是和本地消息表/服务实一样的,只不过是将可靠消息服务和MQ功能封装在一起,屏蔽了底层细节,从而更方便用户的使用。以RocketMQ为例:

RocketMq分布式事务处理流程:

  • 生产者将消息发送至 RocketMQ 版服务端。
  • RocketMQ 服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息。
  • 生产者开始执行本地事务逻辑。
  • 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  • 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  • 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  • 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期:

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。具体信息,请参见消费重试。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 版默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:RocketMQ 版按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

RocketMQ事务消息支持:

  • 消息类型:事务消息仅支持在MessageType为Transaction的主题使用,即事务消息只能发送至类型为事务消息的主题中。
  • 消息消费:RocketMQ事务消息保证生产者本地事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务自行保证消息正确处理,建议消费端做好消费重试。
  • 中间状态:RocketMQ事务消息一致性为最终一致性,即在消息提交到下游消费端处理完成之前,下游和上游事务之间的状态会不一致。因此,事务消息仅适合能接受异步执行的场景。
  • 事务超时:RocketMQ事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

基于RocketMQ实现分布式事务

设计思路:

  • 泛型抽象:使用 TransactionHandler 来封装具体的本地事务逻辑(例如,创建订单、扣减库存)。
  • 数据载体:使用 TransactionArg 作为sendMessageInTransaction的arg参数,将事务ID、业务参数和对应的处理器handler粘合在一起。
  • 统一监听器:实现一个单一、非泛型的TransactionListener。这个监听器不执行任何具体业务,只负责:
    • 从arg(即TransactionArg)中解构出handler。
    • 调用handler.executeLocal(…)来执行具体业务。
    • 基于TransactionLogEntity进行事务状态的持久化和回查。

核心逻辑实现

依赖:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.1</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>5.3.1</version>
</dependency>

定义事务参数:

1
2
3
public record TransactionArg<T>(Long txId, T arg, TransactionHandler<T> handler) {

}

定义 TransactionHandler 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 本地事务执行器的顶层接口
* @param <T> 业务参数类型
*/
public interface TransactionHandler<T> {

/**
* 执行本地事务
* @param arg 业务参数
* @throws Exception 抛出异常表示本地事务执行失败,需要回滚
*/
void executeLocal(T arg) throws Exception;

/**
* 获取此事务的业务类型,用于日志记录
* @return 业务类型标识符
*/
String getBizType();
}

RocketMQ 事务监听器(核心)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/**
* 统一的分布式事务监听器
* * 实现了 'TransactionListener' 接口,用于处理事务消息的两个阶段:
* 1. executeLocalTransaction: 执行本地事务
* 2. checkLocalTransaction: 回查本地事务状态
*/
@Component("distributedTransactionListener")
public class DistributedTransactionListener implements TransactionListener {
private static final Logger log = LoggerFactory.getLogger(DistributedTransactionListener.class);
private static final int MAX_CHECK_RETRIES = 5;

private final TransactionLogRepository logRepository;
private final ObjectMapper objectMapper; // 用于序列化业务参数

// 通过构造函数注入依赖
public DistributedTransactionListener(TransactionLogRepository logRepository, ObjectMapper objectMapper) {
this.logRepository = logRepository;
this.objectMapper = objectMapper;
}

/**
* 第一阶段:执行本地事务
* * 当发送方发送 "Half Message" 成功后,Broker会回调此方法。
* * @param msg Half Message 消息
* @param arg 'sendMessageInTransaction' 中传递的 'arg' 对象,这里是我们设计的 TransactionArg
* @return 本地事务的执行状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 1. 解析参数
if (!(arg instanceof TransactionArg<?> txArg)) {
log.error("[TX] Invalid argument type. Expected TransactionArg, but got {}",
(arg != null ? arg.getClass().getName() : "null"));
return LocalTransactionState.ROLLBACK_MESSAGE;
}

// 泛型擦除,我们需要手动进行转换,但调用方保证了类型安全
@SuppressWarnings("unchecked")
TransactionArg<Object> genericTxArg = (TransactionArg<Object>) txArg;

Long bizTxId = genericTxArg.txId();
String rocketMqTxId = msg.getTransactionId();

log.info("[TX] Executing local transaction. BizTxId: {}, RocketMQTxId: {}", bizTxId, rocketMqTxId);

// 2. 创建并保存事务日志 (预处理状态)
TransactionLogEntity txLog = new TransactionLogEntity();
txLog.setTransId(bizTxId);
txLog.setBizType(genericTxArg.handler().getBizType());
txLog.setPayload(rocketMqTxId); // 存储RocketMQ的事务ID,用于反查
txLog.setState(TransactionState.UNKNOWN.name()); // 初始状态为 UNKNOWN
txLog.setRetries(0);
try {
txLog.setArgs(objectMapper.writeValueAsString(genericTxArg.arg()));
} catch (Exception e) {
log.error("[TX] Failed to serialize transaction args. BizTxId: {}", bizTxId, e);
txLog.setArgs("Serialization Failed");
}

try {
logRepository.save(txLog);
} catch (Exception dbError) {
log.error("[TX] Failed to save initial transaction log. BizTxId: {}", bizTxId, dbError);
// 连日志库都挂了,无法继续,只能返回 UNKNOWN 等待回查
return LocalTransactionState.UNKNOW;
}

// 3. 执行真正的本地事务
try {
// 调用 TransactionHandler 中定义的本地业务逻辑
genericTxArg.handler().executeLocal(genericTxArg.arg());

// 4. 本地事务成功:更新日志状态为 COMMITTED
txLog.setState(TransactionState.COMMITTED.name());
txLog.setUpdateTime(new Date());
logRepository.save(txLog);

log.info("[TX] Local transaction committed. BizTxId: {}", bizTxId);
// 返回 COMMIT,Broker 将使消息对消费者可见
return LocalTransactionState.COMMIT_MESSAGE;

} catch (Exception e) {
// 5. 本地事务失败:更新日志状态为 ROLLBACK
log.error("[TX] Local transaction failed, rolling back. BizTxId: {}", bizTxId, e);

txLog.setState(TransactionState.ROLLBACK.name());
txLog.setUpdateTime(new Date());
logRepository.save(txLog);

// 返回 ROLLBACK,Broker 将删除此消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}

// 注意:如果 executeLocal 成功,但在更新日志为 COMMITTED 时DB又挂了,
// 日志状态将保持为 UNKNOWN。这没问题,后续的 'checkLocalTransaction' 会来处理。
}

/**
* 第二阶段:回查本地事务状态
* 当 Broker 长时间未收到 'executeLocalTransaction' 的响应 (COMMIT/ROLLBACK) 时, (例如
* executeLocalTransaction 返回了 UNKNOW,或者生产者在返回前崩溃) Broker 会回调此方法来 "check" 事务的最终状态。
* * @param msg 消息
*
* @return 本地事务的最终状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String rocketMqTxId = msg.getTransactionId();
// 在 executeLocalTransaction 时,将 bizTxId 存入了 msg 的属性中
String bizTxIdStr = msg.getProperty("BIZ_TX_ID");
if (bizTxIdStr == null) {
logger.warn("[TX-Check] 'BIZ_TX_ID' property not found in message. RocketMQTxId: {}. THIS IS UNUSUAL.", rocketMqTxId);
// 降级:尝试通过 RocketMQ ID 查询 (前提是 executeLocalTransaction 中 payload 存的是 RocketMQ ID)
// 在我们的设计中,我们使用 bizTxId 作为主查询键,所以这里只能返回 UNKNOWN
return LocalTransactionState.UNKNOW;
}

Long bizTxId = Long.parseLong(bizTxIdStr);
logger.info("[TX-Check] Checking local transaction state. BizTxId: {}, RocketMQTxId: {}",
bizTxId, rocketMqTxId);

// 1. 根据业务ID查询事务日志
Optional<TransactionLogEntity> txLogOpt = logRepository.findByTransId(bizTxId);

if (txLogOpt.isEmpty()) {
// 极端情况:executeLocalTransaction 连第一条日志都没存进去就挂了
logger.warn("[TX-Check] Transaction log not found for BizTxId: {}. Assuming ROLLBACK.",bizTxId);
// 理论上应该回滚,因为本地事务很可能没执行
return LocalTransactionState.ROLLBACK_MESSAGE;
}

TransactionLogEntity txLog = txLogOpt.get();
// 2. 根据日志状态返回结果
TransactionState state = TransactionState.valueOf(txLog.getState());
switch (state) {
case COMMITTED:
logger.info("[TX-Check] State is COMMITTED. BizTxId: {}", bizTxId);
return LocalTransactionState.COMMIT_MESSAGE;
case ROLLBACK:
logger.info("[TX-Check] State is ROLLBACK. BizTxId: {}", bizTxId);
return LocalTransactionState.ROLLBACK_MESSAGE;
case UNKNOWN:
default:
// 状态是 UNKNOWN,意味着 executeLocalTransaction 没执行完,或者执行完了但更新状态失败
logger.warn("[TX-Check] State is UNKNOWN. BizTxId: {}. Retries: {}", bizTxId, txLog.getRetries());
// 增加重试次数
txLog.setRetries(txLog.getRetries() + 1);
logRepository.save(txLog);
// 超过最大重试次数,标记为回滚 (或转人工)
if (txLog.getRetries() > MAX_CHECK_RETRIES) {
logger.error("[TX-Check] Max retries exceeded for BizTxId: {}. Force ROLLBACK.", bizTxId);
txLog.setState(TransactionState.ROLLBACK.name());
logRepository.save(txLog);
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
// 继续返回 UNKNOWN,等待 Broker 下一次回查
return LocalTransactionState.UNKNOW;
}
}
}
}

创建事务管理,简化业务使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class TransactionManager {

private static final Logger logger = LoggerFactory.getLogger(TransactionManager.class);

private final TransactionMQProducer transactionMQProducer;

TransactionManager(TransactionMQProducer transactionMQProducer) {
this.transactionMQProducer = transactionMQProducer;
}

// 模拟的全局唯一ID生成器 (生产环境应使用Snowflake或Redis)
private final AtomicLong txIdGenerator = new AtomicLong(System.currentTimeMillis());

public <T> SendResult sendInTransaction(String topic,
String tags,
String messageBody,
TransactionHandler<T> handler,
T localTxArg) throws MQClientException {

// 1. 生成全局唯一的业务事务ID
long bizTxId = txIdGenerator.incrementAndGet();

// 2. 准备消息
Message msg = new Message(topic, tags, messageBody.getBytes(StandardCharsets.UTF_8));

// 关键:将业务事务ID放入消息属性,这样 'checkLocalTransaction' 才能通过 msg.getProperty() 获取到它
msg.putUserProperty("BIZ_TX_ID", String.valueOf(bizTxId));
msg.putUserProperty("BIZ_TYPE", handler.getBizType());

// 3. 准备 'arg' 参数,这个 'arg' 会被传递给 'executeLocalTransaction'
TransactionArg<T> transactionArg = new TransactionArg<>(bizTxId, localTxArg, handler);

// 4. 发送事务消息
logger.info("[TX-Send] Sending transaction message. BizTxId: {}", bizTxId);

// 此方法会:
// 1. 发送 Half Message
// 2. (成功后) 立即同步调用 'executeLocalTransaction'
// 3. (根据返回结果) 提交或回滚
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(msg, transactionArg);

logger.info("[TX-Send] SendResult: {}. BizTxId: {}", sendResult.getSendStatus(), bizTxId);
return sendResult;
}
}

使用示例

创建 CreateOrderTransactionHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class CreateOrderTransactionHandler implements TransactionHandler<OrderInfo> {

private static final Logger logger = LoggerFactory.getLogger(CreateOrderTransactionHandler.class);

@Override
public void executeLocalTransaction(OrderInfo order, Long txId) throws Exception {
logger.info("[Local-TX] 开始执行本地事务:创建订单...");
logger.info("[Local-TX] 正在将订单写入数据库... OrderId: {}", order.orderId());
//todo 写入订单库表

logger.info("[Local-TX] 订单写入数据库成功. OrderId: {}", order.orderId());
// 注意:这里不需要发消息,这里只负责【本地事务】,消息的发送由 TransactionManager负责
}

@Override
public String getBizType() {
return "CREATE_ORDER";
}
}

OrderSercie 接入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class OrderService {

private final TransactionManager transactionManager;
private final CreateOrderTransactionHandler createOrderHandler;
/**
* 创建订单 (事务性操作)
*/
public String createOrder(String product, long amount) throws Exception {
// 1. 准备业务数据
OrderInfo order = new OrderInfo(UUID.randomUUID().toString().substring(0, 8),
1001L,
product,
amount);

// 2. 定义要发送给下游 (如:积分服务、通知服务) 的消息体,消息体通常只包含ID,让下游反查,以避免数据不一致
String messageBody = "{\"orderId\": \"" + order.orderId() + "\"}";

// 3. 调用统一事务管理器
SendResult result = transactionManagerService.sendInTransaction(
"TOPIC_ORDER_CREATED", // 消息主题
"TAG_ORDER", // 消息标签
messageBody, // 发送给消费者的消息体
createOrderHandler, // 【关键】本地事务处理器
order // 【关键】本地事务需要的参数
);

return "Order created. OrderId: " + order.orderId() + ", SendStatus: "
+ result.getSendStatus();
}

}

总结

上面基于RocketMQ实现了一个统一的 DistributedTransactionListener,它不关心具体业务,只负责调度 handler 和维护事务日志,具有极高的可扩展性。业务方(如OrderService)只需注入对应的Handler和TransactionManagerService即可发起事务。


分布式事务原理与实践
http://example.com/2025/07/23/事务-分布式事务原理与实践/
作者
ares
发布于
2025年7月23日
许可协议