java|spring boot + Redis实现消息队列-生产消费者

实现思路: 【java|spring boot + Redis实现消息队列-生产消费者】Redis本身提供了一个发布/订阅模式,但生产消费者模式需要我们自己去实现。

  1. 利用Redis中的队列,将新消息放入名称为xx的队列末尾,完成消息生产者。
  2. 启动一个线程,使用brpop命令循环从xx队列取第一个元素,获得消息,调用注册的消费者执行业务逻辑。
redis 集成:
org.springframework.boot spring-boot-starter-data-redis

配置RedisTemplate
/** * redis配置 * */ @Configuration public class RedisConfig {/** * retemplate相关配置 * @param factory * @return */ @Bean public RedisTemplate redisTemplate(RedisConnectionFactory factory) {RedisTemplate template = new RedisTemplate<>(); // 配置连接工厂 template.setConnectionFactory(factory); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式) Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常 om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jacksonSeial.setObjectMapper(om); // 值采用json序列化 template.setValueSerializer(jacksonSeial); //使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); // 设置hash key 和value序列化模式 template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(jacksonSeial); template.afterPropertiesSet(); return template; }/** * 对hash类型的数据操作 * * @param redisTemplate * @return */ @Bean public HashOperations hashOperations(RedisTemplate redisTemplate) { return redisTemplate.opsForHash(); }/** * 对redis字符串类型数据操作 * * @param redisTemplate * @return */ @Bean public ValueOperations valueOperations(RedisTemplate redisTemplate) { return redisTemplate.opsForValue(); }/** * 对链表类型的数据操作 * * @param redisTemplate * @return */ @Bean public ListOperations listOperations(RedisTemplate redisTemplate) { return redisTemplate.opsForList(); }/** * 对无序集合类型的数据操作 * * @param redisTemplate * @return */ @Bean public SetOperations setOperations(RedisTemplate redisTemplate) { return redisTemplate.opsForSet(); }/** * 对有序集合类型的数据操作 * * @param redisTemplate * @return */ @Bean public ZSetOperations zSetOperations(RedisTemplate redisTemplate) { return redisTemplate.opsForZSet(); }/** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param connectionFactory * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter webMessage) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat的通道 container.addMessageListener(webMessage, new PatternTopic("webMessage")); return container; }@Bean MessageListenerAdapter webMessage(MessageReceiver messageReceiver) {//给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage” //不填defaultListenerMethod默认调用handleMessage return new MessageListenerAdapter(messageReceiver, "receiveMessage"); }/** * 配置redis消息队列消费者容器 * * @param redisTemplate redis * @return 消费者容器 */ @Autowired NormalMessageReceiver normalMessageReceiver; @Bean(initMethod = "init", destroyMethod = "destroy") public RedisMqConsumerContainer redisMqConsumerContainer(RedisTemplate redisTemplate) { RedisMqConsumerContainer config = new RedisMqConsumerContainer(redisTemplate); config.addConsumer(QueueConfiguration.builder() .queue("normalMessage") .consumer(normalMessageReceiver) .build()); return config; }

消息-生产者
public class QueueSender { private RedisTemplate redisTemplate; public QueueSender(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; }public void sendMsg(String queue, Object msg) { redisTemplate.opsForList().leftPush(queue, msg); } }

消息-消费者 消费接口类
public interface MsgConsumer { void onMessage(Object message); void onError(Object msg, Exception e); }

消息监听类
class QueueListener implements Runnable { public static final Logger log = LoggerFactory.getLogger(QueueListener.class); private RedisTemplate redisTemplate; private String queue; private MsgConsumer consumer; public QueueListener(RedisTemplate redisTemplate, String queue, MsgConsumer consumer) { this.redisTemplate = redisTemplate; this.queue = queue; this.consumer = consumer; }@Override public void run() { log.info("QueueListener start...queue:{}", queue); while (RedisMqConsumerContainer.run) { try { Object msg = redisTemplate.opsForList().rightPop(queue, 30, TimeUnit.SECONDS); if (msg != null) { try { consumer.onMessage(msg); } catch (Exception e) { consumer.onError(msg, e); } } } catch (QueryTimeoutException ignored) { } catch (Exception e) { if (RedisMqConsumerContainer.run) { log.error("Queue:{}", queue, e); } else { log.info("QueueListener exits...queue:{}", queue); } } } } }

消息-消费者容器 配置类
public class QueueConfiguration { /** * 队列名称 */ private String queue; /** * 消费者 */ private MsgConsumer consumer; private QueueConfiguration() { }public static Builder builder() { return new Builder(); }String getQueue() { return queue; }MsgConsumer getConsumer() { return consumer; }public static class Builder { private QueueConfiguration configuration = new QueueConfiguration(); public QueueConfiguration defaultConfiguration(MsgConsumer consumer) { configuration.consumer = consumer; configuration.queue = consumer.getClass().getSimpleName(); return configuration; }public Builder queue(String queue) { configuration.queue = queue; return this; }public Builder consumer(MsgConsumer consumer) { configuration.consumer = consumer; return this; }public QueueConfiguration build() { if (configuration.queue == null || configuration.queue.length() == 0) { if (configuration.consumer != null) { configuration.queue = configuration.getClass().getSimpleName(); } } return configuration; }} }

消息--消费者容器
public class RedisMqConsumerContainer { private static final Logger log = LoggerFactory.getLogger(RedisMqConsumerContainer.class); private Map consumerMap = new HashMap<>(); private RedisTemplate redisTemplate; static boolean run; private ExecutorService exec; public RedisMqConsumerContainer(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; }public void addConsumer(QueueConfiguration configuration) { if (consumerMap.containsKey(configuration.getQueue())) { log.warn("Key:{} this key already exists, and it will be replaced", configuration.getQueue()); } if (configuration.getConsumer() == null) { log.warn("Key:{} consumer cannot be null, this configuration will be skipped", configuration.getQueue()); } consumerMap.put(configuration.getQueue(), configuration); }public void destroy() { run = false; this.exec.shutdown(); log.info("QueueListener exiting."); while (!this.exec.isTerminated()) {} log.info("QueueListener exited."); }public void init() { run = true; this.exec = Executors.newCachedThreadPool(r -> { final AtomicInteger threadNumber = new AtomicInteger(1); return new Thread(r, "RedisMQListener-" + threadNumber.getAndIncrement()); }); consumerMap = Collections.unmodifiableMap(consumerMap); consumerMap.forEach((k, v) -> exec.submit(new QueueListener(redisTemplate, v.getQueue(), v.getConsumer()))); }}

配置消费者
public class NormalMessageReceiver implements MsgConsumer { private static Logger log = LoggerFactory.getLogger(TestListener.class); @Override public void onMessage(Object message) { log.info("收到消息:" + message); }@Override public void onError(Object msg, Exception e) { log.error("发生错误,消息:{}", msg, e); } }

测试代码如下:
//开启定时器功能 @EnableScheduling @Component public class MessageSender {@Autowired private QueueSender queueSender; @Scheduled(fixedDelay = 5000) public void sendMessageXXX() { //stringRedisTemplate.convertAndSend("chat1", String.valueOf(Math.random())); //producer.publishMessageXXX("队列消息1xxxxx"+System.currentTimeMillis()); for (int i = 0; i < 20; i++) { queueSender.sendMsg("TEST0","hello quit~~~~,序号:"+i); queueSender.sendMsg("TEST1","hello quit~~~~,序号:"+i); queueSender.sendMsg("TEST2","hello quit~~~~,序号:"+i); }} }

上述代码,有些来源与网络,在下只是在这里做个记忆。我进行删除操作。
redis 作为消息中间件只是在小系统中可以稍微用一下,如果对系统要求高的请使用真正的消息中间件。RabbitMQ和RocketMQ

    推荐阅读