使用场景
- 支付订单成功后,按照【4m,10m,10m,1h,2h,6h,15h】的阶梯时间间隔,将支付结果通知给接入方。
(推荐Redis实现方式) - 某笔申请操作长时间未处理或未审批,需要在一段时间后置为失效。
(推荐定时任务+Redis实现方式)
定时任务 使用说明
?使用ScheduledExecutorService,延迟执行。
ScheduledExecutorService executor =new ScheduledThreadPoolExecutor(3);
executor.schedule(() ->System.out.println("do something"), 2, TimeUnit.SECONDS);
?使用quarz或saturn,配置频率为每秒一次的定时任务,对业务订单表进行轮训判断,订单到点则执行。
优缺点
- 优点:实现简单,各业务模块可自行定义延迟执行规则。
- 缺点:完全由业务代码控制,重复代码多,且需要频繁访问数据库。
适用于系统中只有个别业务场景需要使用延迟队列的情况。
DelayQueue介绍队中元素需要实现Delayed接口,其中getDelay方法用于设置延期时间,compareTo方法用于对队列中的元素进行排序。
DelayQueue是一个BlockingQueue(无界阻塞)队列,位于java.util.concurrent包下,它本质就是封装了一个PriorityQueue(优先队列),来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。
入队:
??put(),线程安全。
出队:
??poll()为非阻塞获取,没有到期的元素直接返回null;
??take()为阻塞方式获取,没有到期的元素线程将会等待。
使用示例
public class Order implements Delayed {
/**
* 延迟时间
*/
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
/**
* 订单号
*/
String orderNo;
public Order(String orderNo, long time, TimeUnit unit) {
this.orderNo = orderNo;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order order = (Order) o;
long diff = this.time - order.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
Order Order = new Order("Order1", 5, TimeUnit.SECONDS);
DelayQueue delayQueue = new DelayQueue<>();
delayQueue.put(Order);
// 取队列头部元素,非阻塞
Order task = delayQueue.poll();
// 阻塞方式
Order task = delayQueue.take();
优缺点
- 优点:JDK自带,使用简单轻巧。
- 缺点:不支持分布式运行和任务持久化。
适用于不需要持久化的单机任务处理。
使用说明
以netty中HashedWheelTimer为例。
文章图片
使用示例
final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);
TimerTask task1 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println(new Date(System.currentTimeMillis()) + ":task1执行 ");
}
};
timer.newTimeout(task1, 5, TimeUnit.SECONDS);
优缺点
- 优点:只需要一个线程去推进时间轮,查询添加效率高。
- 缺点:内存占用相对较高,任务有较大耗时时会影响时间轮的正确性,不支持分布式运行和任务持久化。
适用于需要处理大量定时任务(处理耗时短)的情况,如链接超时管理等。
- RabbitMQ
基于RabbitMQ中的TTL和DXL这两个属性可间接实现延迟队列。
??Time To Live(TTL)指的是消息的存活时间。
??Dead Letter Exchanges(DLX)即死信交换机。
RabbitMQ的Queue可以配置两个参数x-dead-letter-exchange和x-dead-letter-routing-key,一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。 - RocketMQ
rocketmq先把消息按照延迟时间段发送到指定的队列中,然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列。
文章图片
- RabbitMQ
发送消息时指定消息延迟的时间:
public void send(String delayTimes) { amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> { // 设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; }); } }
设置延迟队列出现死信后的转发规则:
@Bean(name = "order.delay.queue") public Queue getMessageQueue() { return QueueBuilder .durable(RabbitConstant.DEAD_LETTER_QUEUE) // 配置到期后转发的交换 .withArgument("x-dead-letter-exchange", "order.close.exchange") // 配置到期后转发的路由键 .withArgument("x-dead-letter-routing-key", "order.close.queue") .build(); }
- RocketMQ
// 生产者发送消息时设置消息延时级别,3对应10秒后发送 msg.setDelayTimeLevel(3);
注意:RocketMQ不支持任意时间间隔的延时消息,只支持特定级别的延时消息【1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h】。
- 优点:基于消息中间件可以很快实现延迟队列,且天然支持消息消费的有序性、消息持久化、ACK机制等。
- 缺点:没有接入RabbitMQ、RocketMQ的团队,需要额外引入,增加了部署和运维成本。
适用于团队已经接入使用RabbitMQ、RocketMQ的情况。
由于Kafka不支持延迟队列和死信队列,如果要基于Kafka实现延迟队列,可以参考RocketMQ的思路。
生产者先投递到Kafka 内部的主题(delay_topic),然后通过一个自定义的服务(DelayService )拉取这些内部主题中的消息,使用DelayQueue进行暂存后,再转发到真实主题中(real_topic)。
文章图片
延迟队列可以采用1个topic,由DelayService拉取后统一转发。也可以按照延迟级别,定义固定个topic,比如像RocketMQ定义【1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h】共18个topic。
Redis 使用说明
- 基于Redis过期回调实现
将任务添加到redis,然后通过监听redis过期回调事件,对到期的任务进行处理。
@Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] pattern) { String expiredKey = message.toString(); System.out.println("监听到key:" + expiredKey + "已过期"); } }
@Configuration public class RedisListenerConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }
- 基于Redis中sorted set实现
文章图片
-- 把的时间戳作为score,添加到redis redis> ZADD delayqueue
"001" -- 取出所有已经“就绪”的任务,并删除任务 redis> MULTI redis> ZRANGEBYSCORE delayqueue 0 redis> ZREMRANGEBYSCORE delayqueue 0 redis> EXEC
- 优点:基于redis完成了任务持久化,且支持分布式运行。
- 缺点:
??缺乏队列顺序消费特性,相同score的任务无法顺序执行。需要基于Redis的list(lpush + rpop)或pub/sub机制开发队列特性。
??缺乏ACK机制,若任务处理失败,队列中的任务已经删除,不能再回退。需要维护两个队列来实现ACK机制。
实现方式选型 根据团队目前情况,延迟队列推荐基于redis和kafka+DelayQueue来实现,如对任务消息投递的可靠性有强要求,可辅以轮询数据库的方式作为补偿。
基于redis实现的功能点
- 入队
业务方push时,使用Redisson延迟队列RDelayedQueue(元素进zset,score到期后由定时任务移交到目标队列)
Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.99.100:6379"); RedissonClient redisson = Redisson.create(config); RBlockingQueue blockingQueue = redisson.getBlockingQueue("delay_queue"); RDelayedQueue delayedQueue = redisson.getDelayedQueue(blockingQueue); delayedQueue.offer("demo", 10, TimeUnit.SECONDS);
- 出队
定义线程池,不停取delayedQueue中的元素,取出则调用业务方回调方法。
RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue("delay_queue"); RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); String message = blockingFairQueue.take();