RabbitMQ整合SpringCloudStream
文章目录
-
- RabbitMQ整合SpringCloudStream
-
- 【一】前言
- 【二】Stream介绍
- 【三】构建项目
- 【四】添加maven依赖
- 【五】创建接口
- 【六】生产者,消费者
- 【七】链接
【一】前言
SpringCloudStream 就是使用了基于消息系统的微服务处理架构。对于消息系统而言一共分为两类:基于应用标准的 JMS、基于协议标准的 AMQP,在整个 SpringCloud 之中支持有 RabbitMQ、Kafka 组件的消息系统。利用 SpringCloudStream 可以实现更加方便的消息系统的整合处理,但是推荐还是基于 RabbitMQ 实现会更好一些。
【二】Stream介绍
文章图片
最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费
【三】构建项目
构建SpringBoot项目,如一下的结构目录:
文章图片
【四】添加maven依赖
添加stream-rabbit依赖
org.springframework.cloud
spring-cloud-starter-stream-rabbit
1.3.4.RELEASE
【五】创建接口
后面我们使用到的生产者,消费者,都会调用我们的接口进行输入输出
/**
* 输入输出的接口
*/
public interface Barista {String OUTPUT_CHANNEL = "output_channel";
String INPUT_CHANNEL = "input_channel";
//注解@Output声明了它是一个输出类型的通道
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel output();
//注解@Input声明了它是一个输入类型的通道
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel input();
}
【六】生产者,消费者
生产者封装:
server.port=8001
server.servlet.context-path=/producerspring.application.name=producer
spring.cloud.stream.bindings.output_channel.destination=exchange-3
spring.cloud.stream.bindings.output_channel.group=queue-3
spring.cloud.stream.bindings.output_channel.binder=rabbit_clusterspring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
@EnableBinding(Barista.class)
@Service
public class RabbitmqSender {
@Autowired
private Barista barista;
// 发送消息
public void sendMessage(Object message, Map, Object> properties) throws Exception {
try{
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
boolean sendStatus = barista.output().send(msg);
System.err.println("--------------sending -------------------");
System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
}catch (Exception e){
System.err.println("-------------error-------------");
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
}
【中间件——(六)RabbitMQ整合SpringCloudStream】消费者:
server.port=8002
server.context-path=/consumerspring.application.name=consumer
spring.cloud.stream.bindings.input_channel.destination=exchange-3
spring.cloud.stream.bindings.input_channel.group=queue-3
spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println("Input Stream 1 接受数据:" + message);
System.out.println("消费完毕------------");
channel.basicAck(deliveryTag, false);
}
}
【七】链接
- 中间件——(1)rabbitMQ的安装
- 中间件——(2)rabbitMQ的相关知识点及概念
- 中间件——(3)rabbitMQ的基本使用
- 中间件——(4)rabbitMQ整合Spring
- 中间件——(5)RabbitMQ整合SpringBoot
- https://www.cnblogs.com/leeSmall/p/8900518.html
- https://www.jianshu.com/p/404fc32122d1
- https://my.oschina.net/u/3829444/blog/1840725
推荐阅读
- MQ|kafka 文件存储 消息同步机制
- java基础|mq消息积压怎么对应
- kubesphere|(十三)从零开始搭建k8s集群——使用KubeSphere管理平台搭建一个高可用的zookeeper版kafka集群服务
- rocket|rocket mq 监听端口_微服务异步架构——MQ之RocketMQ
- Java|JAVA常用工具收集总结
- MQ|基于Swoft2.x框架实现php操作rabbitMQ
- kubesphere|(十四)从零开始搭建k8s集群——使用KubeSphere管理平台搭建一个高可用的基于Raft的kafka集群服务