rocketmq广播消息的(五)
一、简介
广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。
二、代码
/** * 发布订阅消息生产者 */ public class BroadcastProducer { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException { // 1. 创建生产者对象 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr("192.168.32.128:9876"); // 3. 启动生产者 producer.start(); // 4. 生产者发送消息 for (int i = 0; i < 10; i++) { Message message = new Message("TopicTest", "TagA", "OrderID_" + i, ("Hello Broadcast:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.printf("发送结果:%s%n", result); }// 5. 停止生产者 producer.shutdown(); } }
/** * 发布订阅消息生产者 */ public class BroadcastProducer { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException { // 1. 创建生产者对象 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr("192.168.32.128:9876"); // 3. 启动生产者 producer.start(); // 4. 生产者发送消息 for (int i = 0; i < 10; i++) { Message message = new Message("TopicTest", "TagA", "OrderID_" + i, ("Hello Broadcast:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.printf("发送结果:%s%n", result); }// 5. 停止生产者 producer.shutdown(); } }
【rocketmq广播消息的(五)】
推荐阅读
- 危险也是机会
- python学习之|python学习之 实现QQ自动发送消息
- 深入浅出谈一下有关分布式消息技术(Kafka)
- 夏夜|夏夜 我们
- Vue组件之事件总线和消息发布订阅详解
- Redis——发布订阅/消息队列
- Java消息中间件概念基础
- 【20190827复盘】——好消息
- MQ(消息队列)功能介绍
- 前任结婚的消息,究竟有多大杀伤力()