分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用

同步调用

  • 优点:时效性强,可以立即得到结果
  • 缺点:
  1. 耦合度高,每次加入新需求都要该原来的代码
  2. 性能和吞吐能力下降,调用者需要等待提供者响应后才能继续下一步操作
  3. 有额外资源消耗,调用者在等待服务响应过程中,不能释放请求占用的资源
  4. 有级联失效问题,如果服务提供者出现问题,所有调用方都会跟着出现问题
异步调用——通过Broker代理,调用者在请求broker后可以立即返回,无需等待所有结果返回后再响应,这里引用黑马的图片,很直观分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
文章图片

优点:
  1. 耦合度低
  2. 吞吐量提升
  3. 故障隔离
  4. 流量削峰:当有大量请求时,可以先放在broker里,服务器根据自己的处理速度再去broker里处理请求,以此来缓解服务器压力
缺点:
  1. 依赖于Broker的可靠性、安全性、吞吐能力,所以必须保证broker足够可靠
  2. 架构复杂,业务没有明显的业务流程线,不好追踪管理
MQ简介
MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是上述中事件驱动架构中的Broker,下面通过黑马的截图来看看几种常见的MQ区别分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
文章图片

RabbitMQ学习
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网:https://www.rabbitmq.com/,结构如下

基本概念
  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息的队列
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
安装
这次安装是在以前安装好的docker容器上进行的安装,步骤如下
  1. 下载RabbitMQ
    docker pull rabbitmq:3-management#冒号后面是版本号

    阿萨
  2. 下载好后运行容器
    docker run \ -e RABBITMQ_DEFAULT_USER=itcast \#用户名 -e RABBITMQ_DEFAULT_PASS=123321 \#密码 --name mq \#容器名字 --hostname mq1 \#主机名 -p 15672:15672 \#rabbitmq的管理平台端口,可通过该端口登录管理平台 -p 5672:5672 \#收发消息的端口 -d \#后台运行 rabbitmq:3-management #镜像名称

    完成后就安装好了
可以通过输入IP地址和端口在浏览器访问分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
文章图片


常见的消息模型
基本消息队列,下图是官网解释分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
文章图片


工作消息队列概念分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
文章图片

【分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用】以上两种都是通过队列来发送,没有exchange交换机的概念,并且消息被消费后立即销毁
以下三种都属于发布订阅模式(允许将同一个消息发送给多个消费者),根据交换机类型不同又分为三类,分别是
  • 广播模式(Fanout),一次向多个消费者发送消息,x指的是exchange交换机
  • 分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
    文章图片


  • 路由模式(Direct),选择性接收消息

    分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
    文章图片

  • 主题模式(Topic),基于模式(主题)接收消息 分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
    文章图片


基本消息队列消息发送流程
  1. 创建connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息
基本消息队列消息接收流程
  1. 创建connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定
SpringAMQP
AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求
SpringAMQP:是基于AMQP协议的一套api规范,提供了模板来收发消息。包括了两部分,其中spring-amqp是基础抽象,spring-rabbit是底层默认实现的,官网地址Spring AMQP
通俗来讲就是通过官方原生的方法来创建消息队列太麻烦了,spring现在通过封装底层实现来帮我们简化了创建流程和代码难度
下面通过SpringAMQP创建做一个最简单的基本消息队列
  1. 创建一个父工程并引入相关依赖
    org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test

  2. 创建推送者Publisher子项目,并且配置yml文件
    logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168.65.129 # rabbitMQ的ip地址 port: 5672 # rabbitmq收发消息的端口 username: itcast#rabbitmq用户名 password: 123321#rabbitmq密码 virtual-host: /#虚拟主机名字

  3. 写一个单元测试
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; //引入rabbitmq的协议标准对象@Test public void testSendMessage2SimpleQueue() { String queueName = "simple.queue"; //队列名字 String message = "hello, spring amqp!"; //队列消息 rabbitTemplate.convertAndSend(queueName, message); } }

这里注意,我指定的队列名字叫做"simple.queue",但是在rabbitmq里面没有这个队列,需要我手动去创建一个,如下分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
文章图片

创建成功后,启动单元测试,点击队列名字,看一下效果分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
文章图片

以上就是发送消息到队列的过程,但并没有接收消息,现在写如何接收队列里的消息
  1. 创建Consumer消费者springboot子工程
  2. 引入依赖(父工程引过了所以可以不用引)
  3. 配置yml文件
    logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168.65.129 # rabbitMQ的ip地址 port: 5672 # 端口 username: itcast password: 123321 virtual-host: /

  4. 创建一个监听类,这里取名listener,
    @Component//注入为spring的一个bean,好让spring监控到 public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")//这里是指定我们监听的是哪一个消息队列 public void listenSimpleQueue(String msg){ System.out.println("消费者接收到的消息是:"+msg); } }

    完成后启动application启动类,然后就可以接收消息了,如下
    注意:消费完后队列里的消息会被销毁,所以再去rabbitmq的队列里就看不见已经消费过的消息了
下面通过SpringAMQP创建做一个工作队列(work模型),就是将多个消费者绑定到同一队列,同一条消息只会被一个消费者处理分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
文章图片

任务:在一秒内由p端发送50条数据,C1处理速度是40条每秒,C2是10条每秒,让两个消费者协同处理
  1. 在发送端写好发送代码
    @Test public void WorkQueue() throws InterruptedException { String queueName="simple.queue"; String message="工作队列"; for (int i = 0; i <50 ; i++) { rabbitTemplate.convertAndSend(queueName,message+i); Thread.sleep(20); //这里表示50条数据在一秒钟发完 }

  2. 在消费者端写好两个消费者代码
    @RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到的消息是:"+msg+LocalTime.now()); Thread.sleep(25); //表示消费者1每秒处理40个消息 } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到的消息是:"+msg+LocalTime.now()); Thread.sleep(100); //表示消费者2每秒处理10个消息 }

    这里要注意的是,rabbitmq默认才是消息预取的方式,就是两个消费者都先把所有消息从队列里拿过来再处理,如果有50个消息,每个消费者就先拿25个,到手后再一起处理,如果两个消费者的消费能力不一样,这显然是有弊端的,所以需要进行如下配置
  3. 在yml文件做配置
    logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168.65.129 # rabbitMQ的ip地址 port: 5672 # 端口 username: itcast password: 123321 virtual-host: / listener: simple: prefetch: 1#这是新加的配置,表示每个消费者最多先预取1个消息,消费完后再取下一个,保证了每个消费者都根据自己的消费速度来处理消息

  4. 然后分别启动消息发送者和消费者,查看情况,如果电脑处理器正常一般就是消费者一接收了40条消息,消费者2接受了10条消息,耗时在一秒内
发布订阅模式案列学习
首先是广播模式案列实现,Fanout exchange会将收到的消息路由到每一个跟其绑定的queue
步骤如下:
  1. 在消息发送端写发送代码,以前都是直接发给队列,现在是发送给交换机,所以代码如下
    @Test public void testSendFanoutExchange() { // 交换机名称 String exchangeName = "exchange01"; // 消息 String message = "hello, every one!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }

  2. 在消费者端新建一个配置类,用于建立交换机和队列名称,并将交换机和队列绑定,代码如下
    @Configuration//表示该类为配置类 public class FanoutConfig { // 交换机有多种类型,这里是广播模式,所以声明FanoutExchange交换机名字叫做"exchange01" @Bean//声明为一个bean交给spring管理 public FanoutExchange fanoutExchange(){ return new FanoutExchange("exchange01"); }// 声明队列名字叫做fanout.queue1 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); }// 绑定队列1到交换机 @Bean public Binding fanoutBinding(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); //表示将fanoutQueue1队列绑定到fanoutExchange交换机 }// fanout.queue2 @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); }// 绑定队列2到交换机 @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue2) .to(fanoutExchange); } }

  3. 完成消费者代码的编写
    @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到fanout.queue1的消息是:"+"{"+msg+"}"); }@RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到fanout.queue2的消息是:"+"{"+msg+"}"); }

    完成后重启服务,效果如下分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
    文章图片

如此便实现了一个消息被多个消费者消费的目的
路由模式(routes)
前面介绍的广播模式,可以将一个消息发送给多个与路由器绑定的队列;而现在介绍的路由模式,是把消息发送给与路由器绑定了相同key值的队列,实现了消息的指定发送,即选择性接收消息
发布者发送消息时,会指定一个RoutingKey,而队列则会绑定一个bindingKey,当这两个参数的值相同时,路由器才会将消息路由到队列中去,现在做一个案列,来实现消息的指定发送
  1. 这次先写发送端代码
    @Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "direct exchange"; // 消息 String message = "发给所有队列"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "common" /*这里的common就是routingKey,要和路由器的bindingkey相对应才能成功接收到值*/, message); }

  2. 写接收端代码,这次不通过bean的方式来写,直接在注解上配置队列名字和路由器名字及类型
    //路由模式 @RabbitListener(bindings = @QueueBinding( value = https://www.it610.com/article/@Queue(name ="direct.queue1"), //声明队列的名字 exchange = @Exchange(name = "direct exchange", type = ExchangeTypes.DIRECT), //声明路由器的名字和路由器的类型,这里是direct类型 key = {"common", "队列A的key"} //声明路由器的key值,这里声明一个共有的common和一个特有的”队列A的key“ )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】"); }@RabbitListener(bindings = @QueueBinding( value= https://www.it610.com/article/@Queue(name ="direct.queue2"), exchange = @Exchange(name = "direct exchange",type = ExchangeTypes.DIRECT), key = {"common","队列B的key"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者接收到direct.queue2的消息"+"【" + msg + "】"); }

    注意看接收端的代码,有一个key值是common,即是共有的key值,我们在发送端也写有common这个值,启动服务看效果分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
    文章图片

可以发现通过共有的key值,路由器把消息分别发送给了两个队列,由两个消费者端接收到了一样的消息,这和广播模式很像
现在修改routingKey值,改为某个队列特有的key,将key值改为"队列A的key",看效果分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
文章图片

现在就只有direct.queue1队列才能收到消息了,另外一个队列因为key值不同所有没能收到消息,由此实现了路由模式的选择性接收消息的效果

话题模式(Topic)
TopicExchange和DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割
并且在绑定时可以以#或*作为通配符
#:代指一个或多个单词
*:代指一个单词
话题模式就是要同一个话题的消息才能被接收,比如有中国天气和外国天气,中国军事和外国军事,这时如果设置routingKey为中国.#,则发送中国天气和中国军事这两个板块。如果设置为#.军事,则发送中国军事和外国军事这两个板块,下面举例
  1. 先写发送端
    @Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "Topic exchange"; // 消息 String message = "这是中国军事频道"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.junshi" , message); }@Test public void testSendTopicExchange2() { // 交换机名称 String exchangeName = "Topic exchange"; // 消息 String message = "这是外国军事频道"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "waiguo.junshi" , message); } }


  2. 写接收端代码,和路由模式差不多,就是bandingKey要改一下
    @RabbitListener(bindings = @QueueBinding( value = https://www.it610.com/article/@Queue(name="topce.queue1"), exchange = @Exchange(name= "Topic exchange" ,type = ExchangeTypes.TOPIC), key = "china.#" ) ) public void listenTopicQueue1(String msg){ System.out.println("只接收到与china相关的消息,不管是什么内容"+"【" + msg + "】"); }@RabbitListener(bindings = @QueueBinding( value = https://www.it610.com/article/@Queue(name="topce.queue2"), exchange = @Exchange(name= "Topic exchange" ,type = ExchangeTypes.TOPIC), key = "#.junshi" ) ) public void listenTopicQueue2(String msg){ System.out.println("只接收到junshi相关的消息,不管是哪个国家的"+"【" + msg + "】"); }

    先写启动中国军事频道的测试类,会发现接收端所有中国的话题会报道中国军事,军事话题会报道军事分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
    文章图片

    再把发送端代码的中国军事改为天气
    @Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "Topic exchange"; // 消息 String message = "今天中国气温适中"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.tianqi" , message); }

    效果如下分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
    文章图片


消息转换器
在springAMQP的发送方法中,接收消息的类型是Object,也就是所我们可以发送任意对象的消息,SpringAMQP会自动序列化字节后发送
下面通过创建对象的方式,将对象从发送端传递到消费端,看看如何实现
  1. 在夫父工程引入相关依赖
    com.fasterxml.jackson.core jackson-databind

  2. 在消费者端配置类声明一个队列
    @Bean public Queue objectQueue(){ return new Queue("object.queue"); }

  3. 在消费者端写好接收代码
    @RabbitListener(queues = "object.queue") public void listenObjectQueue2(List msg){ System.out.println("接收到object.queue的消息:" + msg); }

  4. 在发送端写发送代码,以对象形势发送,这里用list对象
    @Test public void testSendObject() { List list =new ArrayList(); list.add("小鱼"); list.add("大鱼"); // 发送消息 rabbitTemplate.convertAndSend( "object.queue",list); }

  5. 分别在发送端和接收端的启动类写转换工具的方法
    @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }

    然后启动服务,效果如下分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
    文章图片


    推荐阅读