使用@TransactionalEventListener监听事务教程

目录

  • @TransactionalEventListener监听事务
    • 最近在项目遇到一个问题
    • 解决办法:@TransactionalEventListener
    • 拓展
  • 注解@TransactionalEventListener
    • 监听的对象
    • 监听到之后的操作

@TransactionalEventListener监听事务 【使用@TransactionalEventListener监听事务教程】项目背景

最近在项目遇到一个问题
A方法体内有 INSERT、UPDATE或者DELETE操作,最后会发送一段MQ给外部,外部接收到MQ后会再发送一段请求过来,系统收到请求后会执行B方法,B方法会依赖A方法修改后的结果,这就有一个问题,如果A方法事务没有提交;且B方法的请求过来了会查询到事务未提交前的状态,这就会有问题

解决办法:@TransactionalEventListener
在Spring4.2+,有一种叫做TransactionEventListener的方式,能够控制在事务的时候Event事件的处理方式。 我们知道,Spring的发布订阅模型实际上并不是异步的,而是同步的来将代码进行解耦。而TransactionEventListener仍是通过这种方式,只不过加入了回调的方式来解决,这样就能够在事务进行Commited,Rollback…等的时候才会去进行Event的处理。
具体实现
//创建一个事件类package com.qk.cas.config; import org.springframework.context.ApplicationEvent; public class MyTransactionEvent extends ApplicationEvent {private static final long serialVersionUID = 1L; private IProcesser processer; public MyTransactionEvent(IProcesser processer) {super(processer); this.processer = processer; }public IProcesser getProcesser() {return this.processer; }@FunctionalInterfacepublic interface IProcesser {void handle(); }}//创建一个监听类package com.qk.cas.config; import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; @Componentpublic class MyTransactionListener {@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)public void hanldeOrderCreatedEvent(MyTransactionEvent event) {event.getProcesser().handle(); }}//MQ方法的变动@Autowiredprivate ApplicationEventPublisher eventPublisher; @Autowiredprivate AmqpTemplate rabbitTemplate; public void sendCreditResult(String applyNo, String jsonString) {eventPublisher.publishEvent(new MyTransactionEvent(() -> {LOGGER.info("MQ。APPLY_NO:[{}]。KEY:[{}]。通知报文:[{}]", applyNo, Queues.CREDIT_RESULT, jsonString); rabbitTemplate.convertAndSend(Queues.CREDIT_RESULT, jsonString); })); }


拓展
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 只有当前事务提交之后,才会执行事件监听的方法,其中参数phase默认为AFTER_COMMIT,共有四个枚举:
public enum TransactionPhase {/*** Fire the event before transaction commit.* @see TransactionSynchronization#beforeCommit(boolean)*/BEFORE_COMMIT,/*** Fire the event after the commit has completed successfully.* Note: This is a specialization of {@link #AFTER_COMPLETION} and* therefore executes in the same after-completion sequence of events,* (and not in {@link TransactionSynchronization#afterCommit()}).* @see TransactionSynchronization#afterCompletion(int)* @see TransactionSynchronization#STATUS_COMMITTED*/AFTER_COMMIT,/*** Fire the event if the transaction has rolled back.* Note: This is a specialization of {@link #AFTER_COMPLETION} and* therefore executes in the same after-completion sequence of events.* @see TransactionSynchronization#afterCompletion(int)* @see TransactionSynchronization#STATUS_ROLLED_BACK*/AFTER_ROLLBACK,/*** Fire the event after the transaction has completed.* For more fine-grained events, use {@link #AFTER_COMMIT} or* {@link #AFTER_ROLLBACK} to intercept transaction commit* or rollback, respectively.* @see TransactionSynchronization#afterCompletion(int)*/AFTER_COMPLETION}


注解@TransactionalEventListener 例如 用户注册之后需要计算用户的邀请关系,递归操作。如果注册的时候包含多步验证,生成基本初始化数据,这时候我们通过mq发送消息来处理这个邀请关系,会出现一个问题,就是用户还没注册数据还没入库,邀请关系就开始执行,但是查不到数据,导致出错。
@TransactionalEventListener 可以实现事务的监听,可以在提交之后再进行操作。

监听的对象
package com.jinglitong.springshop.interceptor; import com.jinglitong.springshop.entity.Customer; import org.springframework.context.ApplicationEvent; public class RegCustomerEvent extends ApplicationEvent{public RegCustomerEvent(Customer customer){super(customer); }}


监听到之后的操作
package com.jinglitong.springshop.interceptor; import com.alibaba.fastjson.JSON; import com.jinglitong.springshop.entity.Customer; import com.jinglitong.springshop.entity.MqMessageRecord; import com.jinglitong.springshop.servcie.MqMessageRecordService; import com.jinglitong.springshop.util.AliMQServiceUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; import java.util.Date; import java.util.HashMap; import java.util.Map; @Component@Slf4jpublic class RegCustomerListener { @Value("${aliyun.mq.order.topic}")private String topic; @Value("${aliyun.mq.regist.product}")private String registGroup; @Value("${aliyun.mq.regist.tag}")private String registTag; @AutowiredMqMessageRecordService mqMessageRecordService; @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)public void hanldeRegCustomerEvent(RegCustomerEvent regCustomerEvent) {Customer cust = (Customer) regCustomerEvent.getSource(); Map map = new HashMap(); map.put("custId", cust.getZid()); map.put("account", cust.getAccount()); log.info("put regist notice to Mq start"); String hdResult = AliMQServiceUtil.createNewOrder(cust.getZid(), JSON.toJSONString(map),topic,registTag,registGroup); MqMessageRecord insert = buidBean(cust.getZid(),hdResult,registTag,JSON.toJSONString(map),registGroup); if(StringUtils.isEmpty(hdResult)) {insert.setStatus(false); }else {insert.setStatus(true); }mqMessageRecordService.insertRecord(insert); log.info("put regist notice to Mq end"); log.info("regist notice userId : " + cust.getAccount()); } private MqMessageRecord buidBean (String custId,String result ,String tag,String jsonStr,String groupId) {MqMessageRecord msg = new MqMessageRecord(); msg.setFlowId(custId); msg.setGroupName(groupId); msg.setTopic(topic); msg.setTag(tag); msg.setMsgId(result); msg.setDataBody(jsonStr); msg.setSendType(3); msg.setGroupType(1); msg.setCreateTime(new Date()); return msg; } }@Autowiredprivate ApplicationEventPublisher applicationEventPublisher; applicationEventPublisher.publishEvent(new RegCustomerEvent (XXX));

这样可以确保数据入库之后再进行异步计算
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

    推荐阅读