一、应用场景
【手写一个基于redis的消息队列】首先 我先引入一个大家熟知的观点:Redis可以作为消息队列来使用。
我们在开发过?程中,redis用的并不少,但是我想大部分我们都只把redis当作缓存使用,涉及到的数据结构也不会太多,其实redis的数据结构是十分精妙的,而所说的基于redis来写一个消息队列,就是基于redis的list(列表结构)。redis作为消息队列有两种模式,一种是发布-订阅模式,一种是生产者-消费者模式,本文主要讲的是后者。
二、关于redis的list
如下图所示,redis的list底层结构其实是一个双向链表,每个listNode节点都保存有prev和next指针,来指向他的前驱和后继节点,所以这个数据结构的功能是十分强大的:左边进右边出就是一个队列,左边进左边出就是一个栈......而redis的消息队列其实就是前一种情况。
文章图片
三、代码结构
首先来看一下生产者的实现:produce方法就是push消息的核心方法,实现十分简单,不作赘述。值得一提的是,每次有消息推入队列时这边都会将对应消费者唤醒,这也算是做的一个优化点,具体的下文会讲。
public class DefaultProducer implements Producer {
@Autowired
private RedisTemplate redisTemplate;
/**
* 最大消息堆积数,默认10000条,暂时没用,后期用于消息数量限制,防止消息无限堆积
*/
private Integer MAX_MESSAGE_SIZE;
private static Integer DEFAULT_MAX_MESSAGE_SIZE = 10000;
//当前消费者线程
private static volatile ConcurrentHashMap currentConsumeThreads = new ConcurrentHashMap<>();
public DefaultProducer(){
this(DEFAULT_MAX_MESSAGE_SIZE);
}public DefaultProducer(Integer maxMessageSize){
this.MAX_MESSAGE_SIZE = maxMessageSize;
}
@Override
public void produce(String key, T message) {
Assert.notNull(key,"queue's name can not be null!");
try{
redisTemplate.opsForList().leftPush(key, JSON.toJSONString(message));
if(currentConsumeThreads.containsKey("消费者线程"+key)){
LockSupport.unpark(currentConsumeThreads.get("消费者线程"+key));
log.info("消费者线程:{} -> 唤醒",currentConsumeThreads.get("消费者线程"+key).getName());
}
}catch (Exception e){
log.error("消息推送失败,路由key:{},message:{}",key,JSON.toJSONString(message));
}}/**
* 设置当前消费者线程
* @param t 当前消费者线程
*/
public static void setCurrentConsumerThread(Thread t){
synchronized (DefaultProducer.class){
currentConsumeThreads.put(t.getName(),t);
}
}}
来看一下消费者的实现:首先我这边有提供一个默认的消费者基类,来支撑核心功能,所有自定义的消费者必须实现此类而不必关心细节。从下面代码可以看到,基类的队列路由key是由他的子类所提供的,只要在子类上面加上@MessageHandler注解,注解里提供队列名即可,要注意的是,子类必须是一个被spring管理的类。
再来看一下消费者的执行流程,基类会为每个自定义的消费者创建一个线程,这个线程用while循环无限监听消息,但是我们知道死循环是一个很耗性能的操作,尤其是没有消息的时候就完全是做无用功,所以这里在取不到消息的时候会将当前线程挂起,然后前面所提到的生产者对这个队列推送消息时,消费者线程会被唤醒,大大提高了性能。
public abstract class BaseConsumer implements Consumer{
/**
* 消息key
*/
private String key;
private Thread worker;
@Autowired
private RedisTemplate redisTemplate;
public BaseConsumer(){
this.key = this.getClass().getAnnotation(MessageHandler.class).key();
}@PostConstruct
public void start(){
init();
}@Override
public void consume(Object message){
throw new UnsupportedOperationException();
}public final void init(){
if(worker == null){
worker = new Thread(()->{
//暂时先写成死循环 ,但是在没有消息取的时候会造成空转,后期优化(以优化)
while(true){
Object message = redisTemplate.opsForList().rightPop(key);
if (message == null){
//没有消息则将当前线程挂起,避免循环空转
log.info("当前消费者线程:{},未取到消息",Thread.currentThread().getName());
DefaultProducer.setCurrentConsumerThread(worker);
LockSupport.park();
}
consume(message);
}});
worker.start();
}
}}
四、关于使用
第一步:注入生产者,推送自定义消息
文章图片
第二步:实现对应key的自定义消费者,注解里的就是你要监听的key
文章图片
第三步:用postman工具测试
文章图片
推荐阅读
- Redis|redis原理之布隆过滤器(Bloom Filter)
- redis安装与基本使用
- java|图解四种 IO 模型
- Redis|Redis性能解析--Redis为什么那么快()
- java|你跳一次涨多少(今天见识到跳槽天花板!!)
- java|送你一份大厂都这么解决Redis缓存问题,面试官必问!
- (免费领取红包封面)【Redis 系列】redis 学习四,set 集合,hash 哈希,zset 有序集合初步认知
- redis优化(bigkey、hotkey)
- redis高可用(主从、哨兵、集群)
- 【Redis 系列】redis 学习四,set 集合,hash 哈希,zset 有序集合初步认知