互联网架构|延时任务从入门到精通

1. 背景

在日常开发中,延时任务是一个无法避免的话题。为了达到延时这一目的,在不同场景下会有不同的解决方案,对各个方案优缺点的认知程度决定了架构决策的有效性。
本文章,以电商订单超时未支付为业务场景,推导多种解决方案,并对每个方案的优缺点进行分析,所涉及的方案包括:
1.数据库轮询方案。2.单机内存解决方案。3.分布式延时队列方案。
最后,为了提升研发效率,我们将使用声明式编程思想,对分布式延时队列方案进行封装,有效的分离 业务 与 技术。
1.1 业务场景 业务场景非常简单,就是大家最熟悉的电商订单,相信很多细心的小伙伴都发现,我们在电商平台下单后,如果超过一定的时间还未支付,系统自动将订单设置为超时自动取消,从而释放绑定的资源。
核心流程如下:
1.在电商平台下单,生成待支付订单;2.在规定的时间内没有完成支付,系统将自动取消订单,订单状态变成“超时取消”;3.在规定的时间内完成支付,订单将变成“已支付”
订单状态机如下:
互联网架构|延时任务从入门到精通
文章图片

状态机
1.2 基础组件简介 整个 Demo 采用 DDD 的设计思路,为了便于理解,先介绍所涉及的基础组件:
1.2.1. OrderInfo 订单聚合根,提供构建和取消等业务方法。具体的代码如下:
@Data @Entity @Table(name = "order_info") public class OrderInfo {@Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @Column(name = "status") @Enumerated(EnumType.STRING) private OrderInfoStatus orderStatus; @Column(name = "create_time") private Date createTime = new Date(); /** * 取消订单 */ public void cancel() { setOrderStatus(OrderInfoStatus.CANCELLED); }/** * 创建订单 * @param createDate * @return */ public static OrderInfo create(Date createDate){ OrderInfo orderInfo = new OrderInfo(); orderInfo.setCreateTime(createDate); orderInfo.setOrderStatus(OrderInfoStatus.CREATED); return orderInfo; } }

1.2.2 OrderInfoRepository 基于 Spring Data Jpa 实现,主要用于数据库访问,代码如下:
public interface OrderInfoRepository extends JpaRepository { List getByOrderStatusAndCreateTimeLessThan(OrderInfoStatus created, Date overtime); }

Spring Data 会根据 方法签名 或 @Query 注解生成代理对象,无需我们写任何代码,便能实现基本的数据库访问。
1.2.3. OrderInfoService 应用服务层,面向 User Case,主要完成业务流程编排,核对代码如下:
@Service @Slf4j public class OrderInfoService { @Autowired private ApplicationEventPublisher eventPublisher; @Autowired private OrderInfoRepository orderInfoRepository; /** * 生单接口
* 1. 创建订单,保存至数据库 * 2. 发布领域事件,触发后续处理 * @param createDate */ @Transactional(readOnly = false) public void create(Date createDate){ OrderInfo orderInfo = OrderInfo.create(createDate); this.orderInfoRepository.save(orderInfo); eventPublisher.publishEvent(new OrderInfoCreateEvent(orderInfo)); }/** * 取消订单 * @param orderId */ @Transactional(readOnly = false) public void cancel(Long orderId){ Optional orderInfoOpt = this.orderInfoRepository.findById(orderId); if (orderInfoOpt.isPresent()){ OrderInfo orderInfo = orderInfoOpt.get(); orderInfo.cancel(); this.orderInfoRepository.save(orderInfo); log.info("success to cancel order {}", orderId); }else { log.info("failed to find order {}", orderId); } }/** * 查找超时未支付的订单 * @return */ @Transactional(readOnly = true) public List findOvertimeNotPaidOrders(Date deadLine){ return this.orderInfoRepository.getByOrderStatusAndCreateTimeLessThan(OrderInfoStatus.CREATED, deadLine); } }

1.2.4. OrderController 对外暴露的 Web 接口,提供接口创建订单,主要用于测试,代码如下:
@RestController @RequestMapping("order") public class OrderController { @Autowired private OrderInfoService orderInfoService; /** * 生成新的订单,主要用于测试 */ @PostMapping("insertTestData") public void createTestOrder(){ Date date = DateUtils.addMinutes(new Date(), -30); date = DateUtils.addSeconds(date, 10); this.orderInfoService.create(date); } }

所依赖的组件介绍完了,让我们进入第一个方案。
2. 数据库轮询方案
这是最简单的方案,每个订单都保存了创建时间,只需要写个定时任务,从数据库中查询出已经过期但是尚未支付的订单,依次执行订单取消即可。
2.1. 方案实现 核心流程如下:
互联网架构|延时任务从入门到精通
文章图片

数据库轮询方案
1.用户创建订单,将订单信息保存到数据库;2.设定一个定时任务,每一秒触发一次检查任务;3.任务按下面步骤执行?先从数据库中查找 超时未支付 的订单;?依次执行定的 Cancel 操作;?将变更保存到数据库;
核心代码如下:
@Service @Slf4j public class DatabasePollStrategy { @Autowired private OrderInfoService orderInfoService; /** * 每隔 1S 运行一次
* 1. 从 DB 中查询过期未支付订单(状态为 CREATED,创建时间小于 deadLintDate) * 2. 依次执行 取消订单 操作 */ @Scheduled(fixedDelay = 1 * 1000) public void poll(){ Date now = new Date(); Date overtime = DateUtils.addMinutes(now, -30); List overtimeNotPaidOrders = orderInfoService.findOvertimeNotPaidOrders(overtime); log.info("load overtime Not paid orders {}", overtimeNotPaidOrders); overtimeNotPaidOrders.forEach(orderInfo -> this.orderInfoService.cancel(orderInfo.getId())); } }

2.2. 方案小结 1.优点:简单?开发简单。系统复杂性低,特别是在 Spring Schedule 帮助下;?测试简单。没有外部依赖,逻辑集中,方便快速定位问题;?上线简单。没有繁琐的配置,复杂的申请流程;2.缺点:?数据库负担重。不停的轮询,会加重数据库的负载;?时效性不足。任务最高延时为轮询时间,不适合时效要求高的场景(在订单场景已经足够);?存在大量无效轮询。在没有过期订单的情况下,出现大量的无效扫描;?没有消峰能力。短时间出现大量过期订单,会造成任务集中执行,出现明显的业务高峰;
总之,该方案非常适合业务量级小,业务迭代快的项目。
3. 单机内存解决方案
对于延时任务,JDK 为我们准备了大量工具,使用这些工具可以解决我们的问题。
3.1 DelayQueue
DelayQueue 是一种特殊的阻塞队列,可以为每个任务指定延时时间,只有在延时时间到达后,才能获取任务。
整体结构如下:
互联网架构|延时任务从入门到精通
文章图片

延时队列
核心流程如下:
1.用户下单完成后,向延时队列提交一个任务;2.时间达到后,后台工作线程从队列中读取任务;3.工作线程调用 CancelOrder 方法 对过期未支付的订单执行取消操作;
核心代码如下:
@Slf4j @Service public class DelayQueueStrategy implements SmartLifecycle { private final DelayQueue delayTasks = new DelayQueue<>(); private final Thread thread = new OrderCancelWorker(); private boolean running; @Autowired private OrderInfoService orderInfoService; @TransactionalEventListener public void onOrderCreated(OrderInfoCreateEvent event){ // 将 订单号 放入延时队列 this.delayTasks.offer(new DelayTask(event.getOrderInfo().getId(), 10)); log.info("success to add Delay Task for Cancel Order {}", event.getOrderInfo().getId()); }/** * 启动后台线程,消费延时队列中的任务 */ @Override public void start() { if (this.running){ return; } this.thread.start(); this.running = true; }/** * 停止后台线程 */ @Override public void stop() { if (!this.running){ return; } this.thread.interrupt(); this.running = false; }@Override public boolean isRunning() { return this.running; }@Override public boolean isAutoStartup() { return true; }/** * 延时任务 */ @Value private static class DelayTask implements Delayed{ private final Long orderId; private final Date runAt; private DelayTask(Long orderId, int delayTime) { this.orderId = orderId; this.runAt = DateUtils.addSeconds(new Date(), delayTime); }/** * 获取剩余时间 * @param timeUnit * @return */ @Override public long getDelay(TimeUnit timeUnit) { return timeUnit.convert(getRunAt().getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }@Override public int compareTo(Delayed delayed) { if (delayed == this) { return 0; } else { long d = this.getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS); return d == 0L ? 0 : (d < 0L ? -1 : 1); } } }/** * 后台线程,消费延时队列中的消息 */ private class OrderCancelWorker extends Thread { @Override public void run() { // 根据中断状态,确定是否退出 while (!Thread.currentThread().isInterrupted()){ DelayTask task = null; try { // 从队列中获取任务 task = delayTasks.take(); } catch (InterruptedException e) { e.printStackTrace(); } // 取消订单 if (task != null){ orderInfoService.cancel(task.getOrderId()); log.info("Success to Run Delay Task, Cancel Order {}", task.getOrderId()); } } } } }

这个方案,思路非常简单,但是有一定的复杂性,需要对工作线程的生命周期进行手工维护。相对来说,JDK 已经为我们的这种场景进行了封装,也就是基于 DelayQueue 的 ScheduledExecutorService。
3.2 ScheduledExecutorService
ScheduledExecutorService 是基于 DelayQueue 构建的定时调度组件,相对之前的 Timer 有非常大的优势。
整体架构如下:
互联网架构|延时任务从入门到精通
文章图片

ScheduleExecutorService
核心流程如下:
1.用户下单完成后,向 ScheduledExecutorService 注册一个定时任务;2.时间达到后,ScheduledExecutorService 将启动任务;3.线程池线程调用 CancelOrder 方法 对过期未支付的订单执行取消操作;
核心代码如下:
@Slf4j @Service public class ScheduleExecutorStrategy { @Autowired private OrderInfoService orderInfoService; private ScheduledExecutorService scheduledExecutorService; public ScheduleExecutorStrategy(){ BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder() .namingPattern("Schedule-Cancel-Thread-%d") .daemon(true) .build(); this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, basicThreadFactory); }@TransactionalEventListener public void onOrderCreated(OrderInfoCreateEvent event){ // 添加定时任务 this.scheduledExecutorService.schedule(new CancelTask(event.getOrderInfo().getId()), 5, TimeUnit.SECONDS); log.info("Success to add cancel task for order {}", event.getOrderInfo().getId()); }private class CancelTask implements Runnable{ private final Long orderId; private CancelTask(Long orderId) { this.orderId = orderId; }@Override public void run() { // 执行订单取消操作 orderInfoService.cancel(this.orderId); log.info("Success to cancel task for order {}", this.orderId); } } }

相对 DelayQueue 方案,ScheduledExecutorService 代码量少了很多,避免了繁琐的细节。
3.3 小结 【互联网架构|延时任务从入门到精通】优点:
1.避免了对DB的轮询,降低 DB 的压力;2.整体方案简单,使用 JDK 组件完成,没有额外依赖;
缺点:
1.任务容易丢失。任务存储于内存中,服务重启或机器宕机,会造成内存任务丢失;2.单机策略,缺少集群能力。
为了解决 单机内存方案 的问题,我们需要引入分布式方案。
在单机内存方案中,除了 延时队列 实现外,还有一种 “时间轮” 方案,能够大幅降低内存消耗,有兴趣的伙伴可以研究一下。
4. 分布式延时队列方案
内存队列自身存在很多限制,在实际工作中,我们一般会引入分布式解决方案。
4.1 基于 Redis 延时队列
Redis 是最常用的基础设施,作为一个数据结构服务器,在丰富的数据结构帮助下,可以封装成多种高级结构,延时队列便是其中一种。
为了避免重复发明轮子,我们直接使用 Redisson 中的 延时队列。
整体架构与 DelayQueue 基本一致,只是将 内存延时队列 升级为 分布式延时队列,在此就不在论述。
首先,在 pom 中引入 Redisson 相关依赖
org.redisson redisson-spring-boot-starter 3.16.2

然后,在 application 配置文件中增加 redis 相关配置
spring.redis.host=127.0.0.1 spring.redis.port=6379 spring.redis.database=0

最后,就可以注入核心组件 RedissonClient 了
@Autowired private RedissonClient redissonClient;

流程整合后的代码如下:
@Slf4j @Service public class RDelayQueueStrategy implements SmartLifecycle { private boolean running; private Thread thread = new OrderCancelWorker(); private RBlockingQueue cancelOrderQueue; private RDelayedQueue delayedQueue; @Autowired private OrderInfoService orderInfoService; @Autowired private RedissonClient redissonClient; /** * 创建 Redis 队列 */ @PostConstruct public void init(){ this.cancelOrderQueue = redissonClient.getBlockingQueue("DelayQueueForCancelOrder"); this.delayedQueue = redissonClient.getDelayedQueue(cancelOrderQueue); }@TransactionalEventListener public void onOrderCreated(OrderInfoCreateEvent event){ this.delayedQueue.offer(event.getOrderInfo().getId(), 5L, TimeUnit.SECONDS); log.info("success to add Delay Task for Cancel Order {}", event.getOrderInfo().getId()); }/** * 启动后台线程 */ @Override public void start() { if (this.running){ return; } thread.start(); this.running = true; }/** * 停止后台线程 */ @Override public void stop() { if (!this.running){ return; } thread.interrupt(); this.running = false; }@Override public boolean isRunning() { return this.running; }@Override public boolean isAutoStartup() { return true; }private class OrderCancelWorker extends Thread { @Override public void run() { // 根据中断状态,确定是否退出 while (!Thread.currentThread().isInterrupted()){ Long orderId = null; try { // 从队列中获取 订单号 orderId = cancelOrderQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } // 取消订单 if (orderId != null){ orderInfoService.cancel(orderId); log.info("Success to Run Delay Task, Cancel Order {}", orderId); } } } } }

这个方案非常简单,应用于大多数业务场景。但是,Redis 本身是遵循 AP 而非 CP 模型,在集群切换时会出现消息丢失的情况,所以对于一致性要求高的场景,建议使用 RocketMQ 方案。
4.2 基于 RocketMQ 延时队列
RocketMQ 是 阿里开源的分布式消息中间件,其整体设计从 Kafka 借鉴了大量思想,但针对业务场景增加了部分扩展,其中延时队列便是其中最为重要的一部分。
整体架构设计如下:
互联网架构|延时任务从入门到精通
文章图片

RocketMQ 延时队列
核心流程如下:
1.用户下单完成后,向 RocketMQ 提交一个消息;2.时间达到后,消费线程从工作队列中获取消息;3.消费线程解析消息后调用 CancelOrder 方法 对过期未支付的订单执行取消操作;
首先,需要增加 RocketMQ 相关依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.2.1

然后,在 application 添加相关配置
rocketmq.name-server=http://127.0.0.1:9876 rocketmq.producer.group=delay-task-demo

最后,我们就可以使用 RocketMQTemplate 发送消息
@Autowired private RocketMQTemplate rocketMQTemplate;

注:RocketMQ 并不支持任意的时间,而是提供了几个固定的延时时间,一般情况下可以满足我们的业务需求,如果现有固定延时无法满足需求,可以通过多次投递的方式进行解决。比如,RocketMQ 最大支持 2H 延时,而业务需要延时 24H,只需在消息体中增加期望执行时间,获取消息后,如果尚未达到期望执行时间,将消息重新发送回延时队列;如果达到期望执行时间,则执行对于的任务。
发送延时消息:
@Service @Slf4j public class RocketMQBasedDelayStrategy { private static final String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; @Autowired private RocketMQTemplate rocketMQTemplate; @TransactionalEventListener public void onOrderCreated(OrderInfoCreateEvent event){ // 将数据 发送至 RocketMQ 的延时队列 Message message = MessageBuilder .withPayload(String.valueOf(event.getOrderInfo().getId())) .build(); this.rocketMQTemplate.syncSend("delay-task-topic", message, 200, 2); log.info("success to sent Delay Task to RocketMQ for Cancel Order {}", event.getOrderInfo().getId()); } }

构建 Consumer 消费消息
@Service @Slf4j @RocketMQMessageListener(topic = "delay-task-topic", consumerGroup = "delay-task-consumer-group") public class RocketMQBasedDelayTaskConsumer implements RocketMQListener { @Autowired private OrderInfoService orderInfoService; /** * 接收消息回调,执行取消订单操作 * @param message */ @Override public void onMessage(MessageExt message) { byte[] body = message.getBody(); String idAsStr = new String(body); orderInfoService.cancel(Long.valueOf(idAsStr)); } }

4.3 小结 一般互联网公司都会使用 RocketMQ 方案来解决延时问题。
优点,主要来自于分布式服务特性:
1.高性能。作为削峰填谷的利器,发送端、服务器、消费端都提供较高性能;2.高可用。Redis、RocketMQ 都提供了丰富的部署模式,是高可用的基础;3.可扩展。Redis、RocketMQ 集群具有良好的扩展能力;
缺点:
1.需要中间支持。首先,需要基础设施的支持,Redis、RocketMQ 都会增加运维成本;2.需要学习新的 API。需要掌握新的 API,增加学习成本,使用不当还可能出现问题;
5. 声明式编程
架构设计中有一个非常重要的原则:有效分离技术和业务,避免两者的相互影响。
5.1 声明式编程
声明式编程(英语:Declarative programming)是一种编程范式,与命令式编程相对立。它描述目标的性质,让计算机明白目标,而非流程。声明式编程不用告诉计算机问题领域,从而避免随之而来的副作用。而命令式编程则需要用算法来明确的指出每一步该怎么做。
每引入一个中间件,研发人员都需要学习一套新的API,如何有效降低接入成本是一个巨大的挑战,而最常用的重要手段之一就是:声明式编程。
简单来说,就是将能力抽象化,使其能够通过配置的方式灵活的应用于需要的场景。
首先,让我们先看下最终的效果:
@Service @Slf4j public class RocketMQBasedDelayService { @Autowired private OrderInfoService orderInfoService; /** * 通过 RocketMQBasedDelay 指定方法为延时方法,该 注解做两件事:
* 1. 基于 AOP 技术,拦截对 cancelOrder 的调用,将参数转为为 Message, 并发送到 RocketMQ 的延时队列 * 2. 针对 cancelOrder 方法,创建 DefaultMQPushConsumer 并订阅相关消息,进行消息处理 * @param orderId */ @RocketMQBasedDelay(topic = "delay-task-topic-ann", delayLevel = 2, consumerGroup = "CancelOrderGroup") public void cancelOrder(Long orderId){ if (orderId == null){ log.info("param is invalidate"); return; } this.orderInfoService.cancel(orderId); log.info("success to cancel Order for {}", orderId); } }

相比于普通方法,增加 @RocketMQBasedDelay 便可以赋予方法延时能力,这便是“声明式编程”的威力
1.首先在方法上添加 @RocketMQBasedDelay 注解,配置延时队列名称,延时时间,消费者信息;2.当方法被调用时,并不会直接执行,而是将请求转发给 RocketMQ 的延时队列,然后直接返回;3.当到达消息延时时间时,Consumer 从 延时队列中获取消息,并调用 cancelOrder 方法来处理业务流程。
使用这种方式,大大减少了接入成本,降低了出错的概率。
5.2 核心设计 核心设计如下:
互联网架构|延时任务从入门到精通
文章图片

RocketMQBasedDelay
在启动时,增加了两个扩展点:
1.扫描 @RocketMQBasedDelay 注解方法,为方法增加 SendMessageInterceptor 拦截器;2.扫描 @RocketMQBasedDelay 注解方法,生成 RocketMQConsumerContainer 托管对象,并完成 DefaultMQPushConsumer 的配置和启动;
具体的执行流程如下:
1.当方法被调用时,调用被 SendMessageInterceptor 拦截,从而改变原有执行规则,新的流程如下:?从 @RocketMQBasedDelay 获取相关的配置参数;?对请求参数进行序列化处理;?使用 RocketMQTemplate 发送延时消息;?直接返回,中断原有方法调用;2.当延时时间到达时,RocketMQConsumerContainer 中的 DefaultMQPushConsumer 会获取到消息进行业务处理:?反序列化调用参数;?调用业务方法;?返回消费状态;
5.3 核心实现 核心组件,主要分为两类:
1.工作组件。?SendMessageInterceptor。拦截请求,将请求转发至 RocketMQ 的延时队列;?RocketMQConsumerContainer。对 DefaultMQPushConsumer 的封装,主要完成 Consumer 的配置,注册监听器,消息到达后触发任务的执行;2.配置组件。?RocketMQConsumerContainerRegistry。对 Spring 容器中的 Bean 进行扫描,将@RocketMQBasedDelay注解的方法封装成 RocketMQConsumerContainer,并注册到 Spring 容器中;?RocketMQBasedDelayConfiguration。向 Spring 容器注册 AOP 拦截器 和 RocketMQConsumerContainerRegistry;
RocketMQBasedDelay 注解如下:
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RocketMQBasedDelay { /** * RocketMQ topic * @return */ String topic(); /** * 延时级别 * @return */ int delayLevel(); /** * 消费者组信息 * @return */ String consumerGroup(); }

该注解可以放置在方法之上,并在 运行时 生效。
SendMessageInterceptor 核心代码如下:
/** * 拦截方法调用,并将请求封装成 Message 发送至 RocketMQ 的 Topic */ @Slf4j public class SendMessageInterceptor implements MethodInterceptor { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { Method method = methodInvocation.getMethod(); // 1. 获取 方法上的注解信息 RocketMQBasedDelay rocketMQBasedDelay = method.getAnnotation(RocketMQBasedDelay.class); // 2. 将请求参数 转换为 MQ Object[] arguments = methodInvocation.getArguments(); String argData = https://www.it610.com/article/serialize(arguments); Message message = MessageBuilder .withPayload(argData) .build(); // 3. 发送 MQ this.rocketMQTemplate.syncSend(rocketMQBasedDelay.topic(), message , 200, rocketMQBasedDelay.delayLevel()); log.info("success to sent Delay Task to RocketMQ for {}", Arrays.toString(arguments)); return null; }private String serialize(Object[] arguments) { Map result = Maps.newHashMapWithExpectedSize(arguments.length); for (int i = 0; i < arguments.length; i++){ result.put(String.valueOf(i), SerializeUtil.serialize(arguments[i])); } return SerializeUtil.serialize(result); }}

RocketMQConsumerContainer 源码如下:
/** * Consumer 容器,用于对 DefaultMQPushConsumer 的封装 */ @Data @Slf4j public class RocketMQConsumerContainer implements InitializingBean, SmartLifecycle { private DefaultMQPushConsumer consumer; private boolean running; private String consumerGroup; private String nameServerAddress; private String topic; private Object bean; private Method method; @Override public boolean isAutoStartup() { return true; }@Override public void start() { if (this.running){ return; } try { this.consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } this.running = true; }@Override public void stop() { this.running = false; this.consumer.shutdown(); }@Override public boolean isRunning() { return running; }@Override public void afterPropertiesSet() throws Exception { // 构建 DefaultMQPushConsumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup(this.consumerGroup); consumer.setNamesrvAddr(this.nameServerAddress); // 订阅 topic consumer.subscribe(topic, "*"); // 增加拦截器 consumer.setMessageListener(new DefaultMessageListenerOrderly()); this.consumer = consumer; }private class DefaultMessageListenerOrderly implements MessageListenerOrderly {@Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); // 从 Message 中反序列化数据,获得方法调用参数 byte[] body = messageExt.getBody(); String bodyAsStr = new String(body); Map deserialize = SerializeUtil.deserialize(bodyAsStr, Map.class); Object[] params = new Object[method.getParameterCount()]; for (int i = 0; i< method.getParameterCount(); i++){ String o = (String)deserialize.get(String.valueOf(i)); if (o == null){ params[i] = null; }else { params[i] = SerializeUtil.deserialize(o, method.getParameterTypes()[i]); } }// 执行业务方法 method.invoke(bean, params); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setSuspendCurrentQueueTimeMillis(1000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } }return ConsumeOrderlyStatus.SUCCESS; } } }

RocketMQConsumerContainerRegistry 源码如下:
/** * 基于 BeanPostProcessor#postProcessAfterInitialization 对每个 bean 进行处理 * 扫描 bean 中被 @RocketMQBasedDelay 注解的方法,并将方法封装成 RocketMQConsumerContainer, * 以启动 DefaultMQPushConsumer */ public class RocketMQConsumerContainerRegistry implements BeanPostProcessor { private final AtomicInteger id = new AtomicInteger(1); @Autowired private GenericApplicationContext applicationContext; @Value("${rocketmq.name-server}") private String nameServerAddress; /** * 对每个 bean 依次进行处理 * @param bean * @param beanName * @return * @throws BeansException */ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 1. 获取 @RocketMQBasedDelay 注解方法 Class targetCls = AopUtils.getTargetClass(bean); List methodsListWithAnnotation = MethodUtils.getMethodsListWithAnnotation(targetCls, RocketMQBasedDelay.class); // 2. 为每个 @RocketMQBasedDelay 注解方法 注册 RocketMQConsumerContainer for(Method method : methodsListWithAnnotation){ String containerBeanName = targetCls.getName() + "#" + method.getName() + id.getAndIncrement(); RocketMQBasedDelay annotation = method.getAnnotation(RocketMQBasedDelay.class); applicationContext.registerBean(containerBeanName, RocketMQConsumerContainer.class, () -> createContainer(bean, method, annotation)); }return bean; }/** * 构建 RocketMQConsumerContainer * @param proxy * @param method * @param annotation * @return */ privateRocketMQConsumerContainer createContainer(Object proxy, Method method, RocketMQBasedDelay annotation) { Object bean = AopProxyUtils.getSingletonTarget(proxy); RocketMQConsumerContainer container = new RocketMQConsumerContainer(); container.setBean(bean); container.setMethod(method); container.setConsumerGroup(annotation.consumerGroup()); container.setNameServerAddress(nameServerAddress); container.setTopic(annotation.topic()); return container; } }

RocketMQBasedDelayConfiguration 源码如下:
@Configuration public class RocketMQBasedDelayConfiguration {/** * 声明 RocketMQConsumerContainerRegistry,扫描 RocketMQBasedDelay 方法, * 创建 DefaultMQPushConsumer 并完成注册 * @return */ @Bean public RocketMQConsumerContainerRegistry rocketMQConsumerContainerRegistry(){ return new RocketMQConsumerContainerRegistry(); }/** * 声明 AOP 拦截器 * 在调用 @RocketMQBasedDelay 注解方法时,自动拦截,将请求发送至 RocketMQ * @return */ @Bean public SendMessageInterceptor messageSendInterceptor(){ return new SendMessageInterceptor(); }/** * 对 @RocketMQBasedDelay 标注方法进行拦截 * @param sendMessageInterceptor * @return */ @Bean public PointcutAdvisor pointcutAdvisor(@Autowired SendMessageInterceptor sendMessageInterceptor){ return new DefaultPointcutAdvisor(new AnnotationMatchingPointcut(null, RocketMQBasedDelay.class), sendMessageInterceptor); } }

5.4 小结 声明式编程,在设计时会有比较明显的门槛,但这种代价换来的是 使用上的便利性。这种一次性投入,多次创造价值的做法,非常推荐应用,大大提升研发效率、降低错误出现概率。
6. 小结
本文,以自动对超时未支付订单执行取消操作为业务场景,先后介绍了
1.DB 轮询方案;2.基于延时队列和ScheduleExecutorService的单机内存方案;3.基于 Redis 和 RocketMQ 的分布式延时队列方案;
并详细阐述了各个方案优缺点,希望各位伙伴能在实际开发中根据业务场景选择最优解决方案。
最后,对“声明式编程”进行了简单介绍,通过技术手段降低接入成本。
按照惯例,附上源码 源码

    推荐阅读