分布式事务:分布式条件下,多个节点操作的整体事务一致性。
特别是在微服务场景下,业务 A 和业务 B 关联,事务 A 成功,事务 B 失败,由于跨系统, 就会导致不被感知。 此时从整体来看,数据是不一致的。
分布式事务中的两大基本理论:CAP 理论 与 Base 理论。
分布式事务解决方案可以分为:
- 强一致性分布式事务解决方案:基于 CAP 理论
- 最终一致性分布式事务解决方案:基于 Base 理论
在最终一致性分布式事务解决方案中,典型的方案包括:
- TCC 解决方案
- 可靠消息最终一致性解决方案
- 最大努力通知型解决方案
TCC 解决方案
适用场景: 适用于具有强隔离性、严格一致性要求的业务场景,也适用于执行时间比较短的业务。
TCC 方案执行流程:

- try 阶段: 仅做业务的一致性检查和预留相应的资源。
- confirm 阶段: 当 try 阶段所有分支事务执行成功后开始执行 confirm 阶段。
- 默认一定成功。出错(异常):就要 重试或者人工处理,对出错的事务进行干预。
- cancel 阶段: 在业务执行异常或出现错误的情况下,需要回滚事务的操作,执行分支事务的取消操作,并且释放 try 阶段预留的资源。
- 默认一定成功。出错(异常):就要 重试或者人工处理,对出错的事务进行干预。
TCC 方案的优点:
- 锁定资源的粒度变小: 提升系统的性能。
- 保证分布式事务执行后数据一致性: confirm 阶段 和 cancel 阶段需具备幂等性。
- 解决 XA 规范的单点故障问题: 主业务和分支业务都能集群部署。
TCC 方案的缺点:
- 耦合性: 代码需要耦合到具体业务。
- 开发成本: 业务方法都要拆分成 try、confirm 和 cancel 三个阶段。
TCC 需要注意的问题
使用 TCC 方案解决分布式事务问题时,需要注意空回滚、悬挂和幂等的问题。
(1)空回滚问题
空回滚出现的原因: 服务器宕机或者网络发生故障,未执行 try 阶段(或执行到一半)。
解决方案: 判断是否执行了 try 阶段的方法
- 全局事务 ID:生成全局事务记录,贯穿整个分布式事务的执行流程。
- 分支事务记录表:用于记录分支事务,将全局事务ID 和 分支事务 ID 保存到分支事务表中。
- 执行 cancel 阶段前,先读取分支事务表中的数据:
- 若存在 try 阶段插入的记录,则执行正常操作 - 回滚事务
- 若不存在,则为空回滚,不做任何操作
(2)悬挂问题
悬挂问题出现的原因: 预留业务资源后,无法继续往下处理。
- try 阶段:先注册分支事务,再执行 RPC 调用
- 此时发生服务器宕机、应用崩溃或者网络异常等,RPC 调用超时
- 判定RPC 调用超时,就会回滚事务
- 这时,RPC 请求到了对应业务方,但此时事务已经回滚,try 阶段预留的资源就无法释放了
解决方案: 执行了 confirm 或 cancel 阶段,就不能再执行 try 阶段
- 在执行 try 阶段的方法时,判断是否已有执行 confirm 或 cancel 阶段的记录
- 如果存在,则不再执行 try 阶段的方法
(3)幂等问题
幂等主要是各业务方需要解决的业务问题。
幂等问题出现的原因: 服务器宕机、应用崩溃或网络异常等原因,出现方法调用超时。
解决方案: 可查状态
- 增加事务的执行状态
- 每次执行分支事务以及 confirm 阶段 和 cancel 阶段的方法时,都查询此事务的执行状态
实际工作中 TCC 三种方案
(1)通用型
通用型,最常用的。
工作模板如下:
// 消息队列 + 事务消息
public void doBusiness() {
// 消息队列名称
String queueName = "queue";
// 消息内容:json 格式
String msg = "{}";
// 调用 MQ,预发送消息
String msgId = msgService.createPreMsg(queueName, msg);
try {
// 执行业务1 try(业务层面需要做好幂等、悬挂)
// 执行业务2 try(业务层面需要做好幂等、悬挂)
// 执行业务3 try(业务层面需要做好幂等、悬挂)
} catch (Exception e) {
// 回滚业务1 cancel(业务层面需要做好幂等、悬挂、空回滚)
// 回滚业务2 cancel(业务层面需要做好幂等、悬挂、空回滚)
// 回滚业务3 cancel(业务层面需要做好幂等、悬挂、空回滚)
}
RpcContext.getContext().asyncCall(() -> {
// 执行业务1 commit(业务层面需要做好幂等、悬挂)
// 执行业务2 commit(业务层面需要做好幂等、悬挂)
// 执行业务3 commit(业务层面需要做好幂等、悬挂)
msgService.confirmMsg(queueName, msgId);
});
}
(2)异步确保型
你对接别人,还要别人加接口,别人肯定不愿意啊,还麻烦。
异步确保型 TCC 技术方案: 引入 可靠消息服务
- 优点:不要从业务服务进行配合改造,提供 try、confirm、cancel 3个接口

(3)补偿型
在通用型基础上进行简化,只需再提供补偿接口,业务改造量小。
补偿型 TCC 技术方案: 从业务服务只需要提供 Do 和 Compensate 两个接口。
try 阶段有的话,也可能只做一些数据的校验。
- Do 接口:执行接口,执行业务逻辑
- Compensate 接口:补偿接口
Tips:Saga 事务也是类似。

二、Hmily-TCC 实战
以前不使用 hmily ,大部分是因为他很多配置还是 xml 形式,比较麻烦。
所以之前都是推荐使用 byte-tcc。
接入 Hmily-TCC:
1.pom依赖导入:不同服务调用方式,导入不同的配置
<!-- dubbo -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-spring-boot-starter-dubbo</artifactId>
<version>${hmily.version}</version>
<exclusions>
<exclusion>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- springcloud -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-spring-boot-starter-springcloud</artifactId>
<version>${hmily.version}</version>
</dependency>
2.开发:使用 hmliy 注解
// 1. try 阶段
@Transactional(rollbackFor = Exception.class)
@HmilyTCC(confirmMethod = "confirm", cancelMethod = "cancel")
public void doBusiness() {
String txNo;
// 1. 幂等处理:若已处理过,则直接返回
// 2. 悬挂处理:若已处理过,则直接返回
// 3. 业务、RPC调用等
}
// 2. confirm 阶段
@Transactional(rollbackFor = Exception.class)
public void confirm(){
String txNo;
// 幂等处理
// do something
}
// 3. cancel 阶段
@Transactional(rollbackFor = Exception.class)
public void cancel(){
String txNo;
// 幂等处理
// do something
}
3.服务间的调用
// 以 SpringCloud 中 Feign 为栗
// 需要加上注解:@Hmily
@FeignClient(value = "account-service")
public interface AccountClient {
@Hmily
@RequestMapping("/account-service/account/payment")
Boolean payment(@RequestBody AccountDTO accountDO);
}
实战:模拟下订单减库存
以官方 demo 为栗。
- 拉取代码,编译
$ git clone git@github.com:dromara/hmily.git
$ cd hmily/
$ mvn -DskipTests clean install -U
复制代码
- 构建项目,这里以 springcloud 为例,使用 hmily-demo-tcc-springcloud 工程

- 执行 MySQL 脚本
# 脚本在
$ cd ./hmily-demo/sql
hmily-demo.sql
- 修改项目配置
- 订单服务:修改 application.yml 和 hmily.yml 中 MySQL 连接配置
- 账号服务:修改 application.yml 和 hmily.yml 中 MySQL 连接配置
- 库存服务:修改 application.yml 和 hmily.yml 中 MySQL 连接配置
- 注册中心:使用的是 eureka,不用修改
- 启动服务,先启动 eureka,再相继启动其他服务

- 验证:访问 swagger 中 /order/orderPay 接口
- 浏览器访问:http://127.0.0.1:8090/swagger-ui.html

三、Hmily-TCC 源码浅析
(1)框架初始化阶段
hmily-spring-boot-starter 下有 META-INF/spring.factories 文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.dromara.hmily.spring.boot.starter.parent.configuration.HmilyAutoConfiguration
hmily 框架会随着应用程序的启动而启动,并初始化类 HmilyAutoConfiguration:
@Configuration
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class HmilyAutoConfiguration {
// 1. 处理添加 @HmilyTCC 注解的切面入口
@Bean
public SpringHmilyTransactionAspect hmilyTransactionAspect() {
return new SpringHmilyTransactionAspect();
}
// 2. 支持使用注解调用的 RPC 框架
@Bean
@ConditionalOnProperty(value = "hmily.support.rpc.annotation", havingValue = "true")
public BeanPostProcessor refererAnnotationBeanPostProcessor() {
return new RefererAnnotationBeanPostProcessor();
}
// 3. 框架启动初始化类
@Bean
@Qualifier("hmilyTransactionBootstrap")
@Primary
public HmilyApplicationContextAware hmilyTransactionBootstrap() {
return new HmilyApplicationContextAware();
}
}
(2)TCC 的 Try 阶段

入口 @HmilyTCC:为 Hmily 框架处理 TCC 事务的切面:
// AbstractHmilyTransctionAspect是SpringHmilyTransactionAspect的父类。
@Aspect
public abstract class AbstractHmilyTransactionAspect {
private final HmilyTransactionInterceptor interceptor = new HmilyGlobalInterceptor();
@Pointcut("@annotation(org.dromara.hmily.annotation.HmilyTCC) || @annotation(org.dromara.hmily.annotation.HmilyTAC) || @annotation(org.dromara.hmily.annotation.HmilyXA)")
public void hmilyInterceptor() {
}
// 切面环绕执行
@Around("hmilyInterceptor()")
public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return interceptor.invoke(proceedingJoinPoint);
}
}
拦截器拦截,进入 HmilyTransactionInterceptor,主要干 2 件事情:
- select(context):根据 Hmily 事务上下文,获取事务处理器
- 首次执行,事务上下文为 null,事务处理器是 StarterHmilyTccTransactionHandler
- handleTransaction():执行 Hmily 事务
public class HmilyGlobalInterceptor implements HmilyTransactionInterceptor {
// ... ...
static {
// 根据引入不同的 RPC 支持包,获取不同的 RPC 参数加载器
// ===== 重点 =====
// 因为使用的是 SpringCloud,所以获取的是 SpringCloudParameterLoader
parameterLoader;
}
@Override
public Object invoke(final ProceedingJoinPoint pjp) throws Throwable {
HmilyTransactionContext context = parameterLoader.load();
return invokeWithinTransaction(context, pjp);
}
private Object invokeWithinTransaction(final HmilyTransactionContext context,
final ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
// ===== 重点 =====
// 获取事务处理器,进行事务处理
// 首次执行,事务上下文为 null,事务处理器是 StarterHmilyTccTransactionHandler
return getRegistry(signature.getMethod()).select(context)
.handleTransaction(point, context);
}
// ... ...
}
具体事务处理器执行: 首先会做预处理 preTry,即分布式事务开始的一些准备
public class StarterHmilyTccTransactionHandler
implements HmilyTransactionHandler, AutoCloseable {
@Override
public Object handleTransaction(final ProceedingJoinPoint point,
final HmilyTransactionContext context) throws Throwable {
Object returnValue;
try {
// 0. 这块主要做:创建主事务、存储、构建分支事务,创建事务上下文等。
HmilyTransaction hmilyTransaction = executor.preTry(point);
try {
// 执行切面进入点的原始 try 方法,也就是上文提到的 makePayment 方法
returnValue = point.proceed();
// try 执行成功事务日志状态
hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
executor.updateStartStatus(hmilyTransaction);
} catch (Throwable throwable) {
// 如果出现异常, 异步执行 cancel 方法
disruptor.getProvider().onData(() -> {
executor.globalCancel(currentTransaction);
});
throw throwable;
}
// try 方法执行成功,执行 confirm方法
disruptor.getProvider().onData(() -> {
executor.globalConfirm(currentTransaction);
});
} finally {
// 清理资源与缓存
// 记录调用耗时时间
}
return returnValue;
}
}
需要注意的是:confirm 和 cancel 是异步执行时,会有数据异常问题
场景:本先更新操作再插入操作,异步后可能变为先插入再更新了。
为了保证事务数据的一致性:会根据事务 Id 一致性哈希算法。
同一个事务 Id 会被同一线程顺序执行。
使用 HmilyContext 设置事务上下文有两种模式:
- 默认 ThreadLocal
- TransimttableThreadLocal:阿里提供的跨线程 ThreadLocal 的实现
RPC 调用:Feign
分布式事务的 RPC 进行调用: 通过 拦截器在 header 里设置事务上下文
@Configuration
public class HmilyFeignConfiguration {
// 1. 对 RPC 调用进行参数的传递
@Bean
@Qualifier("hmilyFeignInterceptor")
public RequestInterceptor hmilyFeignInterceptor() {
return new HmilyFeignInterceptor();
}
// 2. 对添加了 Hmily 注解的 Bean 实例进行代理
@Bean
public HmilyFeignBeanPostProcessor feignPostProcessor() {
return new HmilyFeignBeanPostProcessor();
}
// 3. 处理 Hystrix 跨线程传递参数问题
@Bean
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public HystrixConcurrencyStrategy hmilyHystrixConcurrencyStrategy() {
return new HmilyHystrixConcurrencyStrategy();
}
}
1.对 RPC 调用进行参数的传递
public class HmilyFeignInterceptor implements RequestInterceptor {
@Override
public void apply(final RequestTemplate requestTemplate) {
// 在 header 中设置事务上下文
RpcMediator.getInstance().transmit(requestTemplate::header,
HmilyContextHolder.get());
}
}
2.对添加了 Hmily 注解的 Bean 实例进行代理
public class HmilyFeignHandler implements InvocationHandler {
@Override
public Object invoke(final Object proxy, final Method method,
final Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
} else {
// 获取事务上下文
final HmilyTransactionContext context = HmilyContextHolder.get();
if (Objects.isNull(context)) {
// 如果为空,则进行正常调用
return this.delegate.invoke(proxy, method, args);
}
final Hmily hmily = method.getAnnotation(Hmily.class);
if (Objects.isNull(hmily)) {
// 如果为空,则进行正常调用
return this.delegate.invoke(proxy, method, args);
}
try {
// 构建参与者对象,进行缓存
// ...
// 发起真正的调用
final Object invoke = delegate.invoke(proxy, method, args);
// 如果调用成功,缓存参与者对象至发起者
if (context.getRole() == HmilyRoleEnum.PARTICIPANT.getCode()) {
// ...
} else {
// ...
}
return invoke;
} catch (Throwable e) {
LOGGER.error("HmilyFeignHandler invoker exception :", e);
throw e;
}
}
}
}
(3)TCC 的 Confirm 阶段
在所有 Try 流程执行完成,且没有异常的情况下:
- 使用 disrupto 队列异步执行executor.globalConfirm(currentTransaction);
public final class HmilyTccTransactionExecutor {
public void globalConfirm(final HmilyTransaction currentTransaction)
throws HmilyRuntimeException {
// 更新事务状态为 confirm
currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction);
// 从本地缓存里面获取所有的参与者对象
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
// 如果参与者的角色是发起者
if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) {
// 执行本地调用
} else {
// 执行 RPC 调用
}
successList.add(true);
} catch (Throwable e) {
//...
} finally {
HmilyContextHolder.remove();
}
}
if (successList.stream().allMatch(e -> e)) {
// 如果每个参与者都执行成功,删除主事务
HmilyRepositoryStorage.removeHmilyTransaction(currentTransaction);
}
}
}
(4)TCC 的 Cancel 阶段
Cancel流程是在分布式事务发起方在 Try 阶段有异常时调用:
executor.globalCancel(currentTransaction);
public final class HmilyTccTransactionExecutor {
public void globalCancel(final HmilyTransaction currentTransaction) {
// 更新事务日志状态为 cancel
currentTransaction.setStatus(HmilyActionEnum.CANCELING.getCode());
HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction);
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
// 如果是发起者,执行本地调用
if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) {
} else {
// 执行远端 RPC 调用
}
} catch (Throwable e) {
// ... ...
} finally {
HmilyContextHolder.remove();
}
}
}
}
(5)事务恢复
事务恢复日志只针对非常特殊、极少的场景,在正常的流程中都会被清理掉。
出现场景:
- 在执行 try 阶段方法时,服务宕机
- 执行 confirm 阶段方法时,有RPC 服务调用不成功
- 执行 cancel 阶段方法时,有RPC 服务调用不成功
解决方法: 定时调度
在初始化 Hmily 框架启动阶段,创建并启动此定时任务。
- 默认 60s 执行一次
- 默认最大重试次数 10次,超过则将事务日志设置成 DEATH 状态,需要人工处理。
public class HmilyTransactionSelfRecoveryScheduled implements AutoCloseable {
private void selfTccRecovery() {
selfTccRecoveryExecutor.scheduleWithFixedDelay(() -> {
try {
// ...
for (HmilyParticipant hmilyParticipant : hmilyParticipantList) {
// 1. 判断是否超过最大重试次数
if (hmilyParticipant.getRetry() > hmilyConfig.getRetryMax()) {
// 更新日志状态为 DEATH
continue;
}
// 2. 如果事务处于 PRE_TRY 状态,即 try 还没执行,则无需处理
// 3. 锁事务日志:避免多个定时任务同时执行
// 若采用数据库来存储,则通过更新 version 字段来获取锁
final boolean successful
= hmilyRepository.lockHmilyParticipant(hmilyParticipant);
if (successful) {
// 根据全局事务id 获取全局事务对象
HmilyTransaction globalHmilyTransaction;
// 如果没有全局事务,证明事务流程已经完成
// 则根据自身的事务状态进行恢复
// 这种场景常见于 RPC 接口调用超时,但是自身执行又成功
if (Objects.isNull(globalHmilyTransaction)) {
tccRecovery(hmilyParticipant.getStatus(), hmilyParticipant);
} else {
// 根据全局事务状态进行恢复
tccRecovery(globalHmilyTransaction.getStatus(),
hmilyParticipant);
}
}
}
} catch (Exception e) {
LOGGER.error("hmily scheduled transaction log is error:", e);
}
}, hmilyConfig.getScheduledInitDelay(), hmilyConfig.getScheduledRecoveryDelay(), TimeUnit.SECONDS);
}
private void tccRecovery(final int status, final HmilyParticipant hmilyParticipant) {
// 如果事务状态是 TRYING 和 CANCELING,执行 cancel 阶段
if (status == HmilyActionEnum.TRYING.getCode()
|| status == HmilyActionEnum.CANCELING.getCode()) {
hmilyTransactionRecoveryService.cancel(hmilyParticipant);
} else if (status == HmilyActionEnum.CONFIRMING.getCode()) {
// 反之,执行 confirm 阶段
hmilyTransactionRecoveryService.confirm(hmilyParticipant);
}
}
}
(6)事务日志存储
对于分布式事务来说,事务日志至关重要。

在事务日志的存储上,Hmily 支持多种介质:File、Redis、MySQL、Zookeeper 等。
这里介绍以 MySQL 为主,其 sql 脚本位于:resource/mysql/schema.sql。
TCC 事务日志的结构主要由 3 个类构成:
- HmilyTransaction:事务主体类,包含多个 HmilyParticipant, 对应 hmily_transaction_global 表
- HmilyParticipant:分支事务类,包含多个 HmilyInvocation,对应 hmily_transaction_participant 表
- HmilyInvocation:事务方法的参数列表实体类
HmilyBootstrap 框架初始化时,会创建初始化事务恢复调度器:
- TCC 事务恢复单线程池:selfTccRecoveryExecutor
- TAC 事务恢复单线程池:selfTacRecoveryExecutor
- 事务日志清理线程池:cleanHmilyTransactionExecutor
- 物理删除线程池:phyDeletedExecutor
Hmily 采用高性能队列 disruptor 进行事务日志的异步存储:
- HmilyRepositoryEventPublisher:进行初始化
- TCC 事务中,事务状态变化均通过这个
作者:卷几你哇链接:https://juejin.cn/post/7122369173812379661来源:*土稀**掘金