#yyds干货盘点#RabbitMQ示例5(主题topic交换机)

智慧并不产生于学历,而是来自对于知识的终生不懈的追求。这篇文章主要讲述#yyds干货盘点#RabbitMQ示例5:主题topic交换机相关的知识,希望能为你提供帮助。
作者:汤圆
个人博客:javalover.cc
远程过程调用RPC 目录

  • 定义
  • 知识点:消息属性、RPC工作流程
  • 简单DEMO
定义远程过程调用,就是在一个机器上调用另一个远程机器上的函数,并返回结果
知识点 1. 消息属性
回复队列(replyTo)
比如本地向远程请求调用函数fun,同时传递了这个replyTo属性,那么远程机器执行完后,会将结果返回到这个replyTo队列
关联ID(correlationId)
比如本地向远程请求调用函数fun,同时传递了correlationId属性,那么远程机器执行完后,会将correlationId添加到消息属性中;
当本地收到消息后,会校验收到的correlationId是否和本地的一致
2. RPC工作流程
#yyds干货盘点#RabbitMQ示例5(主题topic交换机)

文章图片

  • 客户端发送消息到rpc_queue队列,其中消息包含了两个属性reply_tocorrelation_id
  • 服务端从rpc_queue队列接收到消息,进行处理,将结果和correlation_id属性发送到reply_to队列
  • 客户端从reply_to队列接收到消息,比对correlation_id关联ID,如果一致,则进行对应的处理
基础DEMO1. 客户端
RPCClient.java
public class RPCClient private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) ConnectionFactory factory = new ConnectionFactory(); try Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); String message = "10"; // 1. 定义消息属性 // 1.1. 定义唯一的correlationId:用来区分消息,服务端处理完会将该id包含到消息属性中 final String correlationId = UUID.randomUUID().toString(); // 1.2 定义回复队列:服务端处理完会将结果发送到这个回复队列 String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder() .correlationId(correlationId) .replyTo(replyQueueName) .build(); // 2. 发布消息到RPC队列 channel.basicPublish("", RPC_QUEUE_NAME, properties, message.getBytes()); System.out.println("【client】send fib index:" + message); // 3. 接收消息的回调函数:接收服务端的消息,并进行处理 DeliverCallback callback = (consumeTag, deliver)-> // 3.1. 取出消息的correlationId String correlationId2 = deliver.getProperties().getCorrelationId(); // 3.2. 跟之前发送消息时的做对比,如果一致,则打印结果 if(correlationId2.equals(correlationId)) String res = new String(deliver.getBody(), "utf-8"); System.out.println("【client】received fib res:" + res); // 3.3. 如果不一致,则忽略 ; channel.basicConsume(replyQueueName, true, callback, consumeTag-> ); catch (IOException e) e.printStackTrace(); catch (TimeoutException e) e.printStackTrace();

2. 服务端
RPCServer.java
public class RPCServer private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) ConnectionFactory factory = new ConnectionFactory(); try Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); DeliverCallback callback = (consumeTag, deliver)-> // 1. 解析消息,获取消息内容、消息回复队列、消息关联id // 1.1 回复队列:处理完请求数据后,将结果发送到这个队列 String queueName = deliver.getProperties().getReplyTo(); // 1.2 获取消息内容 String msg = new String(deliver.getBody(), "utf-8"); System.out.println("【server】received fib index: " + msg); // 解析出需要执行fib的参数 int num = Integer.parseInt(msg); // 1.3 获取关联id,该id类似于会话id String correlationId = deliver.getProperties().getCorrelationId(); // 1.4 构建回复消息的属性:主要是将关联id包含进去 AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(correlationId) .build(); // 2. 执行fib函数,处理传来的消息内容 int fibNum = fib(num); String fibStr = fibNum + ""; System.out.println("【server】send fib res: " + fibStr); // 3. 回复消息 channel.basicPublish("", queueName, replyProps, fibStr.getBytes("utf-8")); ; channel.basicConsume(RPC_QUEUE_NAME, true, callback, consumeTag-> ); catch (IOException e) e.printStackTrace(); catch (TimeoutException e) e.printStackTrace(); public static int fib(int n) if(n == 0) return 0; if(n == 1) return 1; return fib(n-1)+fib(n-2);

3. 演示效果
  • 先运行服务端代码RPCServer.java,然后再运行客户端代码RPCClient.java
  • 可以看到,客户端发送了10,收到了55;服务端接收到了10,发送了55;
#yyds干货盘点#RabbitMQ示例5(主题topic交换机)

文章图片

#yyds干货盘点#RabbitMQ示例5(主题topic交换机)

文章图片

参考【#yyds干货盘点#RabbitMQ示例5(主题topic交换机)】RabbitMQ官网教程:第六节

    推荐阅读