枕上诗书闲处好,门前风景雨来佳。这篇文章主要讲述#yyds干货盘点#RabbitMQ示例6:远程过程调用RPC相关的知识,希望能为你提供帮助。
作者:汤圆
个人博客: javalover.cc
远程过程调用RPC
目录
- 定义
- 知识点:消息属性、RPC工作流程
- 简单DEMO
知识点 1. 消息属性
回复队列(
replyTo
)【#yyds干货盘点#RabbitMQ示例6(远程过程调用RPC)】比如本地向远程请求调用函数fun,同时传递了这个
replyTo
属性,那么远程机器执行完后,会将结果返回到这个replyTo
队列关联ID(
correlationId
)比如本地向远程请求调用函数fun,同时传递了
correlationId
属性,那么远程机器执行完后,会将correlationId
添加到消息属性中;当本地收到消息后,会校验收到的
correlationId
是否和本地的一致2. RPC工作流程
文章图片
- 客户端发送消息到
rpc_queue
队列,其中消息包含了两个属性reply_to
和correlation_id
- 服务端从
rpc_queue
队列接收到消息,进行处理,将结果和correlation_id
属性发送到reply_to
队列 - 客户端从
reply_to
队列接收到消息,比对correlation_id
关联ID,如果一致,则进行对应的处理
RPCClient.java
public class RPCClient
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args)
ConnectionFactory factory = new ConnectionFactory();
try
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
String message = "10";
// 1. 定义消息属性
// 1.1. 定义唯一的correlationId:用来区分消息,服务端处理完会将该id包含到消息属性中
final String correlationId = UUID.randomUUID().toString();
// 1.2 定义回复队列:服务端处理完会将结果发送到这个回复队列
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.correlationId(correlationId)
.replyTo(replyQueueName)
.build();
// 2. 发布消息到RPC队列
channel.basicPublish("", RPC_QUEUE_NAME, properties, message.getBytes());
System.out.println("【client】send fib index:" + message);
// 3. 接收消息的回调函数:接收服务端的消息,并进行处理
DeliverCallback callback = (consumeTag, deliver)->
// 3.1. 取出消息的correlationId
String correlationId2 = deliver.getProperties().getCorrelationId();
// 3.2. 跟之前发送消息时的做对比,如果一致,则打印结果
if(correlationId2.equals(correlationId))
String res = new String(deliver.getBody(), "utf-8");
System.out.println("【client】received fib res:" + res);
// 3.3. 如果不一致,则忽略
;
channel.basicConsume(replyQueueName, true, callback, consumeTag->
);
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
2. 服务端
RPCServer.java
public class RPCServer private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args)
ConnectionFactory factory = new ConnectionFactory();
try
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
DeliverCallback callback = (consumeTag, deliver)->
// 1. 解析消息,获取消息内容、消息回复队列、消息关联id
// 1.1 回复队列:处理完请求数据后,将结果发送到这个队列
String queueName = deliver.getProperties().getReplyTo();
// 1.2 获取消息内容
String msg = new String(deliver.getBody(), "utf-8");
System.out.println("【server】received fib index: " + msg);
// 解析出需要执行fib的参数
int num = Integer.parseInt(msg);
// 1.3 获取关联id,该id类似于会话id
String correlationId = deliver.getProperties().getCorrelationId();
// 1.4 构建回复消息的属性:主要是将关联id包含进去
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(correlationId)
.build();
// 2. 执行fib函数,处理传来的消息内容
int fibNum = fib(num);
String fibStr = fibNum + "";
System.out.println("【server】send fib res: " + fibStr);
// 3. 回复消息
channel.basicPublish("", queueName, replyProps, fibStr.getBytes("utf-8"));
;
channel.basicConsume(RPC_QUEUE_NAME, true, callback, consumeTag->
);
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
public static int fib(int n)
if(n == 0) return 0;
if(n == 1) return 1;
return fib(n-1)+fib(n-2);
3. 演示效果
- 先运行服务端代码
RPCServer.java
,然后再运行客户端代码RPCClient.java
- 可以看到,客户端发送了10,收到了55;服务端接收到了10,发送了55;
推荐阅读
- hudi使用cow生成parquet格式用hive查询的问题
- 老大说(谁要再用double定义商品金额,就自己收拾东西走)
- 第十一节:Springboot整合log4j2日志
- #过年不停更#HarmonyOS自定义JS组件—灵动的锦鲤
- # yyds干货盘点 # 手把手教你抖音系列视频批量下载器开发
- #yyds干货盘点# Spring 源码三千问同样是AOP代理bean,为什么@Async标记的bean循环依赖时会报错()
- MySQL 数据库SQL 语句的高阶运用
- #yyds干货盘点#nmap(网络探测工具和安全/端口扫描器)
- 矿用防爆LED显示屏(单色,双色,全彩)T:l35-627l-8Oll