最终一致性事务解决方案
单体秒杀服务转dubbo框架+分布式事务实现
方案和技术架构
方案:秒杀方案(
)+ 分布式事务解决方案 (为了让支付、扣减库存和订单状态一致性解决方案,见下图)
目的: dubbo 微服务化 实现订单支付分布式事务,统一提交和回滚
技术架构:
docker + nacos 架构 (舍去zookeeper,nacos更香)
docker 版本 2.7
【最终一致性事务解决方案】springboot 版本 2.6.1
分布式事务(tcc -transaction 1.73)
github地址:https://github.com/luozijing/...
项目体验地址:http://81.69.254.72:9082/logi...
文章图片
tcc-transaction
TCC(Try Confirm Cancel)方案是一种应用层面侵入业务的两阶段提交。是目前最火的一种柔性事务方案,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。
TCC分为两个阶段,分别如下:
- 第一阶段:Try(尝试),主要是对业务系统做检测及资源预留 (加锁,预留资源)
- 第二阶段:本阶段根据第一阶段的结果,决定是执行confirm还是cancel
-
- Confirm(确认):执行真正的业务(执行业务,释放锁)
- Cancle(取消):是预留资源的取消(出问题,释放锁)
文章图片
- TCC 事务机制以初步操作(Try)为中心的,确认操作(Confirm)和取消操作(Cancel)都是围绕初步操作(Try)而展开。因此,Try 阶段中的操作,其保障性是最好的,即使失败,仍然有取消操作(Cancel)可以将其执行结果撤销。
- Try阶段执行成功并开始执行
Confirm
阶段时,默认Confirm
阶段是不会出错的。也就是说只要Try
成功,Confirm
一定成功(TCC设计之初的定义) 。 - Confirm与Cancel如果失败,由TCC框架进行==重试==补偿
- 存在极低概率在CC环节彻底失败,则需要定时任务或人工介入、
- 性能提升:具体业务来实现控制资源锁的粒度变小,不会锁定整个资源。
- 数据最终一致性:基于 Confirm 和 Cancel 的幂等性,保证事务最终完成确认或者取消,保证数据的一致性。
- 可靠性:解决了 XA 协议的协调者单点故障问题,由主业务方发起并控制整个业务活动,业务活动管理器也变成多点,引入集群。
- TCC 的 Try、Confirm 和 Cancel 操作功能要按具体业务来实现,业务耦合度较高,提高了开发成本。
文章图片
tcc源码分析
更具体的流程可以看这
https://blog.csdn.net/tianjin...
启动类通过@EnableTccTransaction注解,真正起作用的是下面的import,在同一个上下文注入相关的类,包括transactionRepository、springBeanFactory、recoveryLock、TransactionManager等类,TransactionManager在整个tcc事务中是一个事务管理类,作用包括begin、commit、rollback等。
@Import(TccTransactionConfigurationSelector.class)
除此之外,还注入上述两个切面,切面CompensableTransactionInterceptor的切点为@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)"),作用赋予事务角色和初始化状态。分事务角色和事务当前状态处理分布式事务。
当角色使root,即根事务,进入rootMethodProceed处理事务上下文(分布式上下文,微服务),来判断自身事务进入confirm、cancel还是try分支。
当角色使provider,即分支事务,进入providerMethodProceed处理事务上下文。
若没有事务角色,则直接执行切面的代理方法。
public Object interceptCompensableMethod(TransactionMethodJoinPoint pjp) throws Throwable {Transaction transaction = transactionManager.getCurrentTransaction();
CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp, transaction);
// if method is @Compensable and no transaction context and no transaction, then root
// else if method is @Compensable and has transaction context and no transaction ,then provider
switch (compensableMethodContext.getParticipantRole()) {
case ROOT:
return rootMethodProceed(compensableMethodContext);
case PROVIDER:
return providerMethodProceed(compensableMethodContext);
default:
return compensableMethodContext.proceed();
}
}
rootMethodProceed方法,很明确,由事务管理器创建事务transaction,然后执行服务调用,try catch住调用的服务的状态,若异常则,进行回滚处理执行cancel方法,若正常执行提交,执行confirm方法。
private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {Object returnValue = https://www.it610.com/article/null;
Transaction transaction = null;
boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();
try {transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity());
try {
returnValue = compensableMethodContext.proceed();
} catch (Throwable tryingException) {transactionManager.rollback(asyncCancel);
throw tryingException;
}transactionManager.commit(asyncConfirm);
} finally {
transactionManager.cleanAfterCompletion(transaction);
}return returnValue;
}
providerMethodProceed方法当上游事务的状态是try 或者是confirm是分别执行try 和commit,当是cancel时,会通过Transaction transaction = transactionRepository.findByXid(transactionContext.getXid()); 先去找之前的事务,因为执行cancel说明大概率之前事务时存在的,因此有了后面的判断,判断之前事务时try成功、try失败或者取消,都有可能需要回滚,另外其他情况就时走定时回复任务去回滚。
private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {Transaction transaction = null;
boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();
try {switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) {
case TRYING:
transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext());
Object result = null;
try {
result = compensableMethodContext.proceed();
//TODO: need tuning here, async change the status to tuning the invoke chain performance
//transactionManager.changeStatus(TransactionStatus.TRY_SUCCESS, asyncSave);
transactionManager.changeStatus(TransactionStatus.TRY_SUCCESS, true);
} catch (Throwable e) {
transactionManager.changeStatus(TransactionStatus.TRY_FAILED);
throw e;
}return result;
case CONFIRMING:
try {
transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
transactionManager.commit(asyncConfirm);
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
logger.info("no existed transaction found at CONFIRMING stage, will ignore and confirm automatically. transaction:" + JSON.toJSONString(transaction));
}
break;
case CANCELLING:try {//The transaction' status of this branch transaction, passed from consumer side.
int transactionStatusFromConsumer = compensableMethodContext.getTransactionContext().getParticipantStatus();
transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
// Only if transaction's status is at TRY_SUCCESS、TRY_FAILED、CANCELLING stage we can call rollback.
// If transactionStatusFromConsumer is TRY_SUCCESS, no mate current transaction is TRYING or not, also can rollback.
// transaction's status is TRYING while transactionStatusFromConsumer is TRY_SUCCESS may happen when transaction's changeStatus is async.
if (transaction.getStatus().equals(TransactionStatus.TRY_SUCCESS)
|| transaction.getStatus().equals(TransactionStatus.TRY_FAILED)
|| transaction.getStatus().equals(TransactionStatus.CANCELLING)
|| transactionStatusFromConsumer == ParticipantStatus.TRY_SUCCESS.getId()) {
transactionManager.rollback(asyncCancel);
} else {
//in this case, transaction's Status is TRYING and transactionStatusFromConsumer is TRY_FAILED
// this may happen if timeout exception throws during rpc call.
throw new IllegalTransactionStatusException("Branch transaction status is TRYING, cannot rollback directly, waiting for recovery job to rollback.");
}} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
logger.info("no existed transaction found at CANCELLING stage, will ignore and cancel automatically. transaction:" + JSON.toJSONString(transaction));
}
break;
}} finally {
transactionManager.cleanAfterCompletion(transaction);
}Method method = compensableMethodContext.getMethod();
return ReflectionUtils.getNullValue(method.getReturnType());
}
另外一个切面是ConfigurableCoordinatorAspect,这个切面的作用是制造参与事务,参与事务的作用的是当Commit时所有的参与事务一起提交,回滚时,所有的参与事务一起回滚,来控制分布式服务。
public Object interceptTransactionContextMethod(TransactionMethodJoinPoint pjp) throws Throwable {Transaction transaction = transactionManager.getCurrentTransaction();
if (transaction != null && transaction.getStatus().equals(TransactionStatus.TRYING)) {Participant participant = enlistParticipant(pjp);
if (participant != null) {Object result = null;
try {
result = pjp.proceed(pjp.getArgs());
participant.setStatus(ParticipantStatus.TRY_SUCCESS);
} catch (Throwable e) {
participant.setStatus(ParticipantStatus.TRY_FAILED);
//if root transaction, here no need persistent transaction
// because following stage is rollback, transaction's status is changed to CANCELING and save
//transactionManager.update(participant);
//
throw e;
}return result;
}
}return pjp.proceed(pjp.getArgs());
}
业务实现 业务要实现订单、库存和支付的一致性事务,首先起一个根事务,该服务负责给前端返回结果,下面有两个子事务,一个是支付,另外一个是扣减库存和订单。
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false)
publicvoid makePayment(MiaoShaUser user, PaymentVo paymentVo) {
log.info("start tcc transaction try: {}", JSONObject.toJSONString(paymentVo));
// 支付
miaoShaUserService.pay( user, paymentVo);
// 扣减库存和更新订单
miaoshaService.completeOrder(user, paymentVo.getOrderId());
}public void confirmMakePayment(MiaoShaUser user, PaymentVo paymentVo) {
log.info("start tcc transaction confirm: {}", JSONObject.toJSONString(paymentVo));
//check if the trade order status is PAYING, if no, means another call confirmMakePayment happened, return directly, ensure idempotency.
}public void cancelMakePayment(MiaoShaUser user, PaymentVo paymentVo) {
log.info("start tcc transaction cancel: {}", JSONObject.toJSONString(paymentVo));
}
以支付为例子,支付的方法都做了幂等控制控制,通过支付记录和支付记录的状态来控制。
还需要特别注意的地方,防止事务悬挂,tcc-transaction默认支持悬挂,利用定时任务来回复悬挂事务。见下面源码
@Override
@Compensable(confirmMethod = "confirmPay", cancelMethod = "cancelPay", transactionContextEditor = DubboTransactionContextEditor.class)
@Transactional
/**
* 支付订单 预留扣款资源
*/
public ResultGeekQ pay(MiaoShaUser user, PaymentVo paymentVo) {
ResultGeekQ resultGeekQ= ResultGeekQ.build();
log.info("start tcc pay try, user:{}, paymentVo:{}", user, paymentVo);
MiaoshaPayment miaoshaPaymentDb = miaoshaPaymentDao.selectByOrderID(paymentVo.getOrderId());
MiaoshaUserAccount miaoshaUserAccountDb = miaoshaUserAccountDao.selectByUserID(user.getId());
// 金额是否足够用
if (miaoshaUserAccountDb.getBalanceAmount() == null
|| miaoshaUserAccountDb.getBalanceAmount().compareTo(paymentVo.getPayAmount()) < 0) {
throw new AccountException("支付金额不足");
}
// 判断支付记录是否存在,try具有重试机制,需要幂等性
if (miaoshaPaymentDb == null) {
// 账户欲扣款
MiaoshaUserAccount miaoshaUserAccount = new MiaoshaUserAccount();
miaoshaUserAccount.setUserId(user.getId());
miaoshaUserAccount.setTransferAmount(paymentVo.getPayAmount());
miaoshaUserAccountDao.updateByUserID(miaoshaUserAccount);
// 插入欲扣款记录
MiaoshaPayment miaoshaPayment = new MiaoshaPayment();
miaoshaPayment.setAmount(paymentVo.getPayAmount());
miaoshaPayment.setMiaoshaOrderId(paymentVo.getOrderId());
miaoshaPayment.setUserId(user.getId());
miaoshaPayment.setCreateTime(new Date());
miaoshaPayment.setUpdateTime(new Date());
miaoshaPayment.setStatus(Constants.PAY_DEALING);
miaoshaPayment.setVersion(1);
miaoshaPaymentDao.insertSelective(miaoshaPayment);
}return resultGeekQ;
}
//transaction type is BRANCH
switch (transaction.getStatus()) {
case CONFIRMING:
commitTransaction(transactionRepository, transaction);
break;
case CANCELLING:
case TRY_FAILED:
rollbackTransaction(transactionRepository, transaction);
break;
case TRY_SUCCESS:if(transactionRepository.getRootDomain() == null) {
break;
}//check the root transaction
Transaction rootTransaction = transactionRepository.findByRootXid(transaction.getRootXid());
if (rootTransaction == null) {
// In this case means the root transaction is already rollback.
// Need cancel this branch transaction.
rollbackTransaction(transactionRepository, transaction);
} else {
switch (rootTransaction.getStatus()) {
case CONFIRMING:
commitTransaction(transactionRepository, transaction);
break;
case CANCELLING:
rollbackTransaction(transactionRepository, transaction);
break;
default:
break;
}
}
break;
default:
// the transaction status is TRYING, ignore it.
break;
}}
推荐阅读
- 学习笔记(带你十天轻松搞定|学习笔记:带你十天轻松搞定 Go 微服务系列大结局(十)- 分布式事务)
- OceanBase 源码解读(八)(事务日志的提交和回放)
- PHP实现Hash环/Hash一致性原理
- MySQL 分布式事务的“路”与“坑”
- 面试题|分布式事务面试题 (史上最全、持续更新、吐血推荐)
- JAVA|spring事务失效的原因
- Spring的事务管理你了解吗
- mysql事务隔离级别
- Phthon|Python词云
- 异地多活的数据一致性简单设计