不操千曲而后晓声,观千剑而后识器。这篇文章主要讲述java 从零开始手写 RPC (02)-netty4 实现客户端和服务端相关的知识,希望能为你提供帮助。
说明上一篇代码基于 socket 的实现非常简单,但是对于实际生产,一般使用 netty。
至于 netty 的优点可以参考:
http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty
文章图片
代码实现 maven 引入
<
dependency>
<
groupId>
io.netty<
/groupId>
<
artifactId>
netty-all<
/artifactId>
<
version>
${netty.version}<
/version>
<
/dependency>
引入 netty 对应的 maven 包,此处为 4.1.17.Final。
服务端代码实现netty 的服务端启动代码是比较固定的。
package com.github.houbb.rpc.server.core;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.server.constant.RpcServerConst;
import com.github.houbb.rpc.server.handler.RpcServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioserverSocketChannel;
/**
* rpc 服务端
* @author binbin.hou
* @since 0.0.1
*/
public class RpcServer extends Thread {private static final Log log = LogFactory.getLog(RpcServer.class);
/**
* 端口号
*/
private final int port;
public RpcServer() {
this.port = RpcServerConst.DEFAULT_PORT;
}public RpcServer(int port) {
this.port = port;
}@Override
public void run() {
// 启动服务端
log.info("RPC 服务开始启动服务端");
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(workerGroup, bossGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<
Channel>
() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new RpcServerHandler());
}
})
// 这个参数影响的是还没有被accept 取出的连接
.option(ChannelOption.SO_BACKLOG, 128)
// 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的链接
ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
log.info("RPC 服务端启动完成,监听【" + port + "】端口");
channelFuture.channel().closeFuture().syncUninterruptibly();
log.info("RPC 服务端关闭完成");
} catch (Exception e) {
log.error("RPC 服务异常", e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}}
为了简单,服务端启动端口号固定,RpcServerConst 常量类内容如下:
public final class RpcServerConst {private RpcServerConst(){}/**
* 默认端口
* @since 0.0.1
*/
public static final int DEFAULT_PORT = 9627;
}
RpcServerHandler当然,还有一个比较核心的类就是 RpcServerHandler
public class RpcServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// do nothing now
}
}
目前是空实现,后续可以添加对应的日志输出及逻辑处理。
测试【java 从零开始手写 RPC (02)-netty4 实现客户端和服务端】启动测试的代码非常简单:
/**
* 服务启动代码测试
* @param args 参数
*/
public static void main(String[] args) {
new RpcServer().start();
}
说明上面我们实现了服务端的实现,这一节来一起看一下 client 客户端代码实现。
代码实现 RpcClient
/*
* Copyright (c)2019. houbinbin Inc.
* rpc All rights reserved.
*/package com.github.houbb.rpc.client.core;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* <
p>
rpc 客户端 <
/p>
*
* <
pre>
Created: 2019/10/16 11:21 下午<
/pre>
* <
pre>
Project: rpc<
/pre>
*
* @author houbinbin
* @since 0.0.2
*/
public class RpcClient extends Thread {private static final Log log = LogFactory.getLog(RpcClient.class);
/**
* 监听端口号
*/
private final int port;
public RpcClient(int port) {
this.port = port;
}public RpcClient() {
this(9527);
}@Override
public void run() {
// 启动服务端
log.info("RPC 服务开始启动客户端");
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
ChannelFuture channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<
Channel>
(){
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new RpcClientHandler());
}
})
.connect("localhost", port)
.syncUninterruptibly();
log.info("RPC 服务启动客户端完成,监听端口:" + port);
channelFuture.channel().closeFuture().syncUninterruptibly();
log.info("RPC 服务开始客户端已关闭");
} catch (Exception e) {
log.error("RPC 客户端遇到异常", e);
} finally {
workerGroup.shutdownGracefully();
}
}}
.connect("localhost", port)
声明了客户端需要连接的服务端,此处和服务端的端口保持一致。RpcClientHandler客户端处理类也比较简单,暂时留空。
/*
* Copyright (c)2019. houbinbin Inc.
* rpc All rights reserved.
*/package com.github.houbb.rpc.client.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* <
p>
客户端处理类 <
/p>
*
* <
pre>
Created: 2019/10/16 11:30 下午<
/pre>
* <
pre>
Project: rpc<
/pre>
*
* @author houbinbin
* @since 0.0.2
*/
public class RpcClientHandler extends SimpleChannelInboundHandler {@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// do nothing.
}}
启动测试 服务端
首先启动服务端。
客户端
然后启动客户端连接服务端,实现如下:
/**
* 服务启动代码测试
* @param args 参数
*/
public static void main(String[] args) {
new RpcClient().start();
}
小结为了便于大家学习,以上源码已经开源:
我是老马,期待与你的下次重逢。
文章图片
推荐阅读
- 初识函数递归
- 深入浅出CMS垃圾收集器
- SpringBoot整合ActiveMq实现Queue和Topic两种模式(图文)
- 究竟 JDK 给我们提供了哪几种线程池,分别有什么特点()
- MySQL强人“锁”难《死磕MySQL系列 三》
- 100台机器上海量IP如何查找出现频率 Top 100()
- 基于CDH 6.3.0 搭建 Hive on Spark 及相关配置和调优
- 鸿蒙开源全场景应用开发——通讯协议
- 优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析