手写一个基于redis的消息队列

一、应用场景
【手写一个基于redis的消息队列】首先 我先引入一个大家熟知的观点:Redis可以作为消息队列来使用。
我们在开发过?程中,redis用的并不少,但是我想大部分我们都只把redis当作缓存使用,涉及到的数据结构也不会太多,其实redis的数据结构是十分精妙的,而所说的基于redis来写一个消息队列,就是基于redis的list(列表结构)。redis作为消息队列有两种模式,一种是发布-订阅模式,一种是生产者-消费者模式,本文主要讲的是后者。

二、关于redis的list
如下图所示,redis的list底层结构其实是一个双向链表,每个listNode节点都保存有prev和next指针,来指向他的前驱和后继节点,所以这个数据结构的功能是十分强大的:左边进右边出就是一个队列,左边进左边出就是一个栈......而redis的消息队列其实就是前一种情况。
手写一个基于redis的消息队列
文章图片


三、代码结构

首先来看一下生产者的实现:produce方法就是push消息的核心方法,实现十分简单,不作赘述。值得一提的是,每次有消息推入队列时这边都会将对应消费者唤醒,这也算是做的一个优化点,具体的下文会讲。

public class DefaultProducer implements Producer { @Autowired private RedisTemplate redisTemplate; /** * 最大消息堆积数,默认10000条,暂时没用,后期用于消息数量限制,防止消息无限堆积 */ private Integer MAX_MESSAGE_SIZE; private static Integer DEFAULT_MAX_MESSAGE_SIZE = 10000; //当前消费者线程 private static volatile ConcurrentHashMap currentConsumeThreads = new ConcurrentHashMap<>(); public DefaultProducer(){ this(DEFAULT_MAX_MESSAGE_SIZE); }public DefaultProducer(Integer maxMessageSize){ this.MAX_MESSAGE_SIZE = maxMessageSize; } @Override public void produce(String key, T message) { Assert.notNull(key,"queue's name can not be null!"); try{ redisTemplate.opsForList().leftPush(key, JSON.toJSONString(message)); if(currentConsumeThreads.containsKey("消费者线程"+key)){ LockSupport.unpark(currentConsumeThreads.get("消费者线程"+key)); log.info("消费者线程:{} -> 唤醒",currentConsumeThreads.get("消费者线程"+key).getName()); } }catch (Exception e){ log.error("消息推送失败,路由key:{},message:{}",key,JSON.toJSONString(message)); }}/** * 设置当前消费者线程 * @param t 当前消费者线程 */ public static void setCurrentConsumerThread(Thread t){ synchronized (DefaultProducer.class){ currentConsumeThreads.put(t.getName(),t); } }}

来看一下消费者的实现:首先我这边有提供一个默认的消费者基类,来支撑核心功能,所有自定义的消费者必须实现此类而不必关心细节。从下面代码可以看到,基类的队列路由key是由他的子类所提供的,只要在子类上面加上@MessageHandler注解,注解里提供队列名即可,要注意的是,子类必须是一个被spring管理的类。
再来看一下消费者的执行流程,基类会为每个自定义的消费者创建一个线程,这个线程用while循环无限监听消息,但是我们知道死循环是一个很耗性能的操作,尤其是没有消息的时候就完全是做无用功,所以这里在取不到消息的时候会将当前线程挂起,然后前面所提到的生产者对这个队列推送消息时,消费者线程会被唤醒,大大提高了性能。

public abstract class BaseConsumer implements Consumer{ /** * 消息key */ private String key; private Thread worker; @Autowired private RedisTemplate redisTemplate; public BaseConsumer(){ this.key = this.getClass().getAnnotation(MessageHandler.class).key(); }@PostConstruct public void start(){ init(); }@Override public void consume(Object message){ throw new UnsupportedOperationException(); }public final void init(){ if(worker == null){ worker = new Thread(()->{ //暂时先写成死循环 ,但是在没有消息取的时候会造成空转,后期优化(以优化) while(true){ Object message = redisTemplate.opsForList().rightPop(key); if (message == null){ //没有消息则将当前线程挂起,避免循环空转 log.info("当前消费者线程:{},未取到消息",Thread.currentThread().getName()); DefaultProducer.setCurrentConsumerThread(worker); LockSupport.park(); } consume(message); }}); worker.start(); } }}

四、关于使用
第一步:注入生产者,推送自定义消息
手写一个基于redis的消息队列
文章图片


第二步:实现对应key的自定义消费者,注解里的就是你要监听的key
手写一个基于redis的消息队列
文章图片


第三步:用postman工具测试
手写一个基于redis的消息队列
文章图片

    推荐阅读