延迟队列

使用场景

  1. 支付订单成功后,按照【4m,10m,10m,1h,2h,6h,15h】的阶梯时间间隔,将支付结果通知给接入方。
    (推荐Redis实现方式)
  2. 某笔申请操作长时间未处理或未审批,需要在一段时间后置为失效。
    (推荐定时任务+Redis实现方式)
实现方式 延迟队列有定时任务轮询数据库、DelayQueue、时间轮算法、消息中间件(RabbitMQ,RocketMQ,Kafka)、Redis等几种实现方式。
定时任务 使用说明
?使用ScheduledExecutorService,延迟执行。
ScheduledExecutorService executor =new ScheduledThreadPoolExecutor(3); executor.schedule(() ->System.out.println("do something"), 2, TimeUnit.SECONDS);

?使用quarz或saturn,配置频率为每秒一次的定时任务,对业务订单表进行轮训判断,订单到点则执行。
优缺点
  • 优点:实现简单,各业务模块可自行定义延迟执行规则。
  • 缺点:完全由业务代码控制,重复代码多,且需要频繁访问数据库。
    适用于系统中只有个别业务场景需要使用延迟队列的情况。
DelayQueue 使用说明
DelayQueue介绍
DelayQueue是一个BlockingQueue(无界阻塞)队列,位于java.util.concurrent包下,它本质就是封装了一个PriorityQueue(优先队列),来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。
队中元素需要实现Delayed接口,其中getDelay方法用于设置延期时间,compareTo方法用于对队列中的元素进行排序。
入队:
??put(),线程安全。
出队:
??poll()为非阻塞获取,没有到期的元素直接返回null;
??take()为阻塞方式获取,没有到期的元素线程将会等待。
使用示例
public class Order implements Delayed { /** * 延迟时间 */ @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") private long time; /** * 订单号 */ String orderNo; public Order(String orderNo, long time, TimeUnit unit) { this.orderNo = orderNo; this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0); } @Override public long getDelay(TimeUnit unit) { return time - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { Order order = (Order) o; long diff = this.time - order.time; if (diff <= 0) { return -1; } else { return 1; } } }

Order Order = new Order("Order1", 5, TimeUnit.SECONDS); DelayQueue delayQueue = new DelayQueue<>(); delayQueue.put(Order); // 取队列头部元素,非阻塞 Order task = delayQueue.poll(); // 阻塞方式 Order task = delayQueue.take();

优缺点
  • 优点:JDK自带,使用简单轻巧。
  • 缺点:不支持分布式运行和任务持久化。
    适用于不需要持久化的单机任务处理。
时间轮算法 【延迟队列】时间轮是一种高效利用线程资源,进行批量调度的调度模型。应用场景包括 Netty、Quartz、ZooKeeper 、Kafka等。
使用说明
以netty中HashedWheelTimer为例。
延迟队列
文章图片

使用示例
final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2); TimerTask task1 = new TimerTask() { public void run(Timeout timeout) throws Exception { System.out.println(new Date(System.currentTimeMillis()) + ":task1执行 "); } }; timer.newTimeout(task1, 5, TimeUnit.SECONDS);

优缺点
  • 优点:只需要一个线程去推进时间轮,查询添加效率高。
  • 缺点:内存占用相对较高,任务有较大耗时时会影响时间轮的正确性,不支持分布式运行和任务持久化。
    适用于需要处理大量定时任务(处理耗时短)的情况,如链接超时管理等。
消息中间件 使用说明
  • RabbitMQ
    基于RabbitMQ中的TTL和DXL这两个属性可间接实现延迟队列。
    ??Time To Live(TTL)指的是消息的存活时间。
    ??Dead Letter Exchanges(DLX)即死信交换机。
    RabbitMQ的Queue可以配置两个参数x-dead-letter-exchange和x-dead-letter-routing-key,一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。
  • RocketMQ
    rocketmq先把消息按照延迟时间段发送到指定的队列中,然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列。
    延迟队列
    文章图片
使用示例
  • RabbitMQ
    发送消息时指定消息延迟的时间:
    public void send(String delayTimes) { amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> { // 设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; }); } }

    设置延迟队列出现死信后的转发规则:
    @Bean(name = "order.delay.queue") public Queue getMessageQueue() { return QueueBuilder .durable(RabbitConstant.DEAD_LETTER_QUEUE) // 配置到期后转发的交换 .withArgument("x-dead-letter-exchange", "order.close.exchange") // 配置到期后转发的路由键 .withArgument("x-dead-letter-routing-key", "order.close.queue") .build(); }

  • RocketMQ
    // 生产者发送消息时设置消息延时级别,3对应10秒后发送 msg.setDelayTimeLevel(3);

    注意:RocketMQ不支持任意时间间隔的延时消息,只支持特定级别的延时消息【1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h】。
优缺点
  • 优点:基于消息中间件可以很快实现延迟队列,且天然支持消息消费的有序性、消息持久化、ACK机制等。
  • 缺点:没有接入RabbitMQ、RocketMQ的团队,需要额外引入,增加了部署和运维成本。
    适用于团队已经接入使用RabbitMQ、RocketMQ的情况。
Kafka+DelayQueue
由于Kafka不支持延迟队列和死信队列,如果要基于Kafka实现延迟队列,可以参考RocketMQ的思路。
生产者先投递到Kafka 内部的主题(delay_topic),然后通过一个自定义的服务(DelayService )拉取这些内部主题中的消息,使用DelayQueue进行暂存后,再转发到真实主题中(real_topic)。
延迟队列
文章图片

延迟队列可以采用1个topic,由DelayService拉取后统一转发。也可以按照延迟级别,定义固定个topic,比如像RocketMQ定义【1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h】共18个topic。
Redis 使用说明
  • 基于Redis过期回调实现
    将任务添加到redis,然后通过监听redis过期回调事件,对到期的任务进行处理。
    @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] pattern) { String expiredKey = message.toString(); System.out.println("监听到key:" + expiredKey + "已过期"); } }

    @Configuration public class RedisListenerConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }

  • 基于Redis中sorted set实现
    延迟队列
    文章图片

    -- 把的时间戳作为score,添加到redis redis> ZADD delayqueue "001" -- 取出所有已经“就绪”的任务,并删除任务 redis> MULTI redis> ZRANGEBYSCORE delayqueue 0 redis> ZREMRANGEBYSCORE delayqueue 0 redis> EXEC

优缺点
  • 优点:基于redis完成了任务持久化,且支持分布式运行。
  • 缺点:
    ??缺乏队列顺序消费特性,相同score的任务无法顺序执行。需要基于Redis的list(lpush + rpop)或pub/sub机制开发队列特性。
    ??缺乏ACK机制,若任务处理失败,队列中的任务已经删除,不能再回退。需要维护两个队列来实现ACK机制。
适用于任务不需要顺序执行的情况,如通知类的业务场景。
实现方式选型 根据团队目前情况,延迟队列推荐基于redis和kafka+DelayQueue来实现,如对任务消息投递的可靠性有强要求,可辅以轮询数据库的方式作为补偿。
基于redis实现的功能点
  1. 入队
    业务方push时,使用Redisson延迟队列RDelayedQueue(元素进zset,score到期后由定时任务移交到目标队列)
    Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.99.100:6379"); RedissonClient redisson = Redisson.create(config); RBlockingQueue blockingQueue = redisson.getBlockingQueue("delay_queue"); RDelayedQueue delayedQueue = redisson.getDelayedQueue(blockingQueue); delayedQueue.offer("demo", 10, TimeUnit.SECONDS);

  2. 出队
    定义线程池,不停取delayedQueue中的元素,取出则调用业务方回调方法。
    RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue("delay_queue"); RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); String message = blockingFairQueue.take();

    推荐阅读