中间件——(六)RabbitMQ整合SpringCloudStream

RabbitMQ整合SpringCloudStream
文章目录

    • RabbitMQ整合SpringCloudStream
      • 【一】前言
      • 【二】Stream介绍
      • 【三】构建项目
      • 【四】添加maven依赖
      • 【五】创建接口
      • 【六】生产者,消费者
      • 【七】链接

【一】前言
SpringCloudStream 就是使用了基于消息系统的微服务处理架构。对于消息系统而言一共分为两类:基于应用标准的 JMS、基于协议标准的 AMQP,在整个 SpringCloud 之中支持有 RabbitMQ、Kafka 组件的消息系统。利用 SpringCloudStream 可以实现更加方便的消息系统的整合处理,但是推荐还是基于 RabbitMQ 实现会更好一些。
【二】Stream介绍
中间件——(六)RabbitMQ整合SpringCloudStream
文章图片

最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费
【三】构建项目
构建SpringBoot项目,如一下的结构目录:
中间件——(六)RabbitMQ整合SpringCloudStream
文章图片

【四】添加maven依赖
添加stream-rabbit依赖
org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.4.RELEASE

【五】创建接口
后面我们使用到的生产者,消费者,都会调用我们的接口进行输入输出
/** * 输入输出的接口 */ public interface Barista {String OUTPUT_CHANNEL = "output_channel"; String INPUT_CHANNEL = "input_channel"; //注解@Output声明了它是一个输出类型的通道 @Output(Barista.OUTPUT_CHANNEL) MessageChannel output(); //注解@Input声明了它是一个输入类型的通道 @Input(Barista.INPUT_CHANNEL) SubscribableChannel input(); }

【六】生产者,消费者
生产者封装:
server.port=8001 server.servlet.context-path=/producerspring.application.name=producer spring.cloud.stream.bindings.output_channel.destination=exchange-3 spring.cloud.stream.bindings.output_channel.group=queue-3 spring.cloud.stream.bindings.output_channel.binder=rabbit_clusterspring.cloud.stream.binders.rabbit_cluster.type=rabbit spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/

@EnableBinding(Barista.class) @Service public class RabbitmqSender { @Autowired private Barista barista; // 发送消息 public void sendMessage(Object message, Map, Object> properties) throws Exception { try{ MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); boolean sendStatus = barista.output().send(msg); System.err.println("--------------sending -------------------"); System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus); }catch (Exception e){ System.err.println("-------------error-------------"); e.printStackTrace(); throw new RuntimeException(e.getMessage()); } } }

【中间件——(六)RabbitMQ整合SpringCloudStream】消费者:
server.port=8002 server.context-path=/consumerspring.application.name=consumer spring.cloud.stream.bindings.input_channel.destination=exchange-3 spring.cloud.stream.bindings.input_channel.group=queue-3 spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster spring.cloud.stream.bindings.input_channel.consumer.concurrency=1 spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000 spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5spring.cloud.stream.binders.rabbit_cluster.type=rabbit spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/

@EnableBinding(Barista.class) @Service public class RabbitmqReceiver {@StreamListener(Barista.INPUT_CHANNEL) public void receiver(Message message) throws Exception { Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); System.out.println("Input Stream 1 接受数据:" + message); System.out.println("消费完毕------------"); channel.basicAck(deliveryTag, false); } }

【七】链接
  • 中间件——(1)rabbitMQ的安装
  • 中间件——(2)rabbitMQ的相关知识点及概念
  • 中间件——(3)rabbitMQ的基本使用
  • 中间件——(4)rabbitMQ整合Spring
  • 中间件——(5)RabbitMQ整合SpringBoot
  • https://www.cnblogs.com/leeSmall/p/8900518.html
  • https://www.jianshu.com/p/404fc32122d1
  • https://my.oschina.net/u/3829444/blog/1840725

    推荐阅读