public class RpcHelloProvider implements IRpcHello {
public String hello(String name) {
return "Hello, " + name + “!”;
}
}
IRpcCalc的实现类如下:
public class RpcCalcProvider implements IRpcCalc {
@Override
public int add(int a, int b) {
return a + b;
}
@Override
public int sub(int a, int b) {
return a - b;
}
@Override
public int mul(int a, int b) {
return a * b;
}
@Override
public int div(int a, int b) {
return a / b;
}
}
Registry 注册中心主要功能就是负责将所有Provider的服务名称和服务引用地址注册到一个容器中(这里为了方便直接使用接口类名作为服务名称,前提是假定我们每个服务只有一个实现类),并对外发布。Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口。先启动一个Netty服务,创建RpcRegistry 类,RpcRegistry.java的具体代码如下:
public class RpcRegistry {
private final int port;
public RpcRegistry(int port){
this.port = port;
}
public void start(){
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 处理拆包、粘包的编解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 处理序列化的编解码器
pipeline.addLast(“encoder”, new ObjectEncoder());
pipeline.addLast(“decoder”, new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
// 自己的业务逻辑
pipeline.addLast(new MyRegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 设置长连接
ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
System.out.println("RPC Registry start listen at " + this.port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new RpcRegistry(8080).start();
}
}
接下来只需要实现我们自己的Handler即可,创建MyRegistryHandler.java,内容如下:
public class MyRegistryHandler extends ChannelInboundHandlerAdapter {
// 在注册中心注册服务需要有容器存放
public static ConcurrentHashMap registryMap = new ConcurrentHashMap<>();
// 类名的缓存位置
private static final List classCache = new ArrayList<>();
// 约定,只要是写在provider下所有的类都认为是一个可以对完提供服务的实现类
// edu.xpu.rpc.provider
public MyRegistryHandler(){
scanClass(“edu.xpu.rpc.provider”);
doRegister();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
// 客户端传过来的调用信息
InvokerMessage request = (InvokerMessage)msg;
// 先判断有没有这个服务
String serverClassName = request.getClassName();
if(registryMap.containsKey(serverClassName)){
// 获取服务对象
Object clazz = registryMap.get(serverClassName);
Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParams());
result = method.invoke(clazz, request.getValues());
System.out.println(“request=” + request);
System.out.println(“result=” + result);
}
ctx.writeAndFlush(result);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
// 实现简易IOC容器
// 扫描出包里面所有的Class
private void scanClass(String packageName){
ClassLoader classLoader = this.getClass().getClassLoader();
URL url = classLoader.getResource(packageName.replaceAll(“\.”, “/”));
File dir = new File(url.getFile());
File[] files = dir.listFiles();
for (File file: files){
if(file.isDirectory()){
scanClass(packageName + “.” + file.getName());
}else{
// 拿出类名
String className = packageName + “.” + file.getName().replace(“.class”, “”).trim();
classCache.add(className);
}
}
}
// 把扫描到的Class实例化,放到Map中
// 注册的服务名称就叫做接口的名字 [约定优于配置]
private void doRegister(){
if(classCache.size() == 0) return;
for (String className: classCache){
try {
Class> clazz = Class.forName(className);
// 服务名称
Class> anInterface = clazz.getInterfaces()[0];
registryMap.put(anInterface.getName(), clazz.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
在这里还通过反射实现了简易的IOC容器,先递归扫描provider包底下的类,把这些类的对象作为服务对象放到IOC容器中进行管理,由于IOC是一个Map实现的,所以将类名作为服务名称,也就是Key,服务对象作为Value。根据消费者传过来的服务名称,就可以找到对应的服务,到此,Registry和Provider已经全部写完了。
2、consumer
目录结构如下:
└─src
├─main
│ ├─java
│ │ └─edu
│ │ └─xpu
│ │ └─rpc
│ │ ├─api
│ │ │ IRpcCalc.java
│ │ │ IRpcHello.java
│ │ │
│ │ ├─consumer
│ │ │ │ RpcConsumer.java
│ │ │ │
│ │ │ └─proxy
│ │ │ RpcProxy.java
│ │ │ RpcProxyHandler.java
│ │ │
│ │ └─core
│ │ InvokerMessage.java
│ │
│ └─resources
└─test
└─java
└─ pom.xml
在看客户端的实现之前,先梳理一下RPC流程。API 模块中的接口只在服务端实现了。因此,客户端调用API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一样,所以必定要对客户端的API接口做代理,隐藏网络请求的细节。
文章图片
由上图的流程图可知,要让用户调用无感知,必须创建出代理类来完成网络请求的操作。
RpcProxy.java如下:
public class RpcProxy {
public static T create(Class> clazz) {
//clazz传进来本身就是interface
MethodProxy proxy = new MethodProxy(clazz);
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz} , proxy);
return result;
}
private static class MethodProxy implements InvocationHandler {
private Class> clazz;
public MethodProxy(Class> clazz) {
this.clazz = clazz;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 如果传进来是一个已实现的具体类
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, args);
} catch (Throwable t) {
t.printStackTrace();
}
// 如果传进来的是一个接口(核心)
} else {
return rpcInvoke(method, args);
}
return null;
}
// 实现接口的核心方法
public Object rpcInvoke(Method method, Object[] args) {
// 传输协议封装
InvokerMessage invokerMessage = new InvokerMessage();
invokerMessage.setClassName(this.clazz.getName());
invokerMessage.setMethodName(method.getName());
invokerMessage.setValues(args);
invokerMessage.setParams(method.getParameterTypes());
final RpcProxyHandler consumerHandler = new RpcProxyHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(“frameDecoder”, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast(“frameEncoder”, new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast(“encoder”, new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast(“decoder”, new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast(“handler”, consumerHandler);
}
});
ChannelFuture future = bootstrap.connect(“localhost”, 8080).sync();
future.channel().writeAndFlush(invokerMessage).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
g 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 roup.shutdownGracefully();
}
return consumerHandler.getResponse();
}
}
}
我们通过传进来的接口对象,获得了要调用的服务名,服务方法名,参数类型列表,参数列表,这样就把自定义的RPC协议包封装好了,只需要把协议包发出去等待结果返回即可,所以为了接收返回值数据还需要自定义一个接收用的Handler,RpcProxyHandlerdiamante如下:
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
private Object result;
public Object getResponse() {
return result;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg;
}
【Java|基于Netty实现RPC框架】@Override
推荐阅读
- java相关|一次线上http连接被拒绝问题的排查
- Web前端|CSS的行内样式与内联样式,web中间开发
- java|网络编程懒人入门(一)(快速理解网络通信协议(上篇))
- java|IP协议报字段
- java|编程必备基础 计算机组成原理+操作系统+计算机网络
- 数据结构|线性表练习之Example040-删除单链表中数据域绝对值相等节点,仅保留第一次出现的节点而删除其余绝对值相等的节点
- 数据结构|线性表练习之Example018-删除单链表中所有值为 x 的节点
- JAVA|java-单链表反转解法及分析
- 链表|LeetCode - 24 - 两两交换链表中等的节点 - Java - 三种解法(递归 + 栈 + 迭代)