归志宁无五亩园,读书本意在元元。这篇文章主要讲述Netty-UDP与HTTPS协议相关的知识,希望能为你提供帮助。
写在前面
最近在使用netty进行数据采集的接收入口,协议分别使用了HTTPS协议和UDP协议,特来总结一下:
一、UDP协议:
1. netty服务端代码:
先进行处理器配置和端口绑定:处理器里就是具体的对接收到的数据进行业务处理。
public void startReceiveProcess() {
LOGGER.info("UdpReceiver start receive process...");
acceptGroup = new NioEventLoopGroup(1000);
bootstrap = new Bootstrap();
bootstrap.group(acceptGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator())
// 指定处理器
.handler(new ChannelInitializer<
DatagramChannel>
() {
@Override
protected void initChannel(DatagramChannel datagramChannel) throws Exception {
ChannelPipeline pipeline = datagramChannel.pipeline();
pipeline.addLast(new UdpHandler());
}
});
try {
int port = baseSource.getPort();
LOGGER.info("UdpReceiver netty port is: {}", port);
// 绑定端口
udpChanel = bootstrap.bind(port).sync().channel();
udpChanel.closeFuture().await(1000);
} catch (InterruptedException ex) {
LOGGER.error("udp netty exception: ", ex);
closeSocket();
}
}
然后具体的处理逻辑如下:
private class UdpHandler extends SimpleChannelInboundHandler<
DatagramPacket>
{
@Override
protected void channelRead0(ChannelHandlerContext context, DatagramPacket msg) {
if (Objects.nonNull(msg)) {
ByteBuf content = msg.content();
byte[] msgByte = ByteUtil.byteBuf2Byte(content);
String receive = new String(msgByte);
LOGGER.info("udp netty server receive data: {}", receive);
// TODO 进行数据具体处理
baseSource.offerSource(msgByte);
// TODO 响应信息
context.writeAndFlush("");
}
}@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
LOGGER.error("metadata server catch exception: ", cause);
Channel channel = context.channel();
if (channel != null) {
channel.close();
}
context.close();
}
}
2. 特别注意:
此处的DatagramPacket是io.netty里面的类,而不是java.net里面的类
曾遇到了一个很容易忽略的问题就是因为包引入错误。
我曾按照此方式写了一个UDP的客户端来进行数据发送,但是服务端一直没有打印接收到的数据,也没有错误信息,只有日志里面出现:
DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message DatagramPacket(/127.0.0.1:64692 =>
/0:0:0:0:0:0:0:0:30204, PooledUnsafeDirectByteBuf(ridx: 0, widx: 323, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.
DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [UdpReceiver$UdpHandler#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x18250f38, L:/0:0:0:0:0:0:0:0:30204].
看上去收到了,但又感觉没收到。收到的长度确实也是对的,但没有打印具体的消息。
文章图片
原来就是因为包引入错误,导致不会去处理消息,打断点也发现不会进入SimpleChannelInboundHandler的channelRead方法里去,引入io.netty.channel.socket.DatagramPacket后,就可以正常接收了。
这两者的写法有细微的差别:
io.netty.channel.socket.DatagramPacket
// 接收到的数据想处理的话,需要转换,因为获取到的为ByteBuf
如下:
ByteBuf content = packet.content();
// 需要通过byteBuf转化为byte数组,然后才能获取到具体字符串格式的信息。
String clientMessage = new String(bytebuf2Byte(content),"UTF-8");
public static byte[] bytebuf2Byte(ByteBuf in) {
int len = in.readableBytes();
byte[] arr = new byte[len];
in.getBytes(0, arr);
return arr;
}
如果是java.net.DatagramPacket
byte[] msg = packet.getData()
String msgStr = new String(msg, "utf-8");
// 获取到的直接是byte数组。
3. 客户端测试:
用jmeter装插件也是可以发UDP请求的,但由于公司网络限制,无法下载就jmeter相关插件,所以只能通过自己写客户端程序进行请求发送。
核心代码为:
ByteBuf dataBuf = Unpooled.copiedBuffer(msg, Charset.forName("UTF-8"));
DatagramPacket datagramPacket = new DatagramPacket(dataBuf, inetSocketAddress)channel.writeAndFlush(datagramPacket).addListener(new GenericFutureListener<
ChannelFuture>
() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
boolean success = future.isSuccess();
System.out.println("Sender datagramPacket result : " + success);
}
});
二、HTTPS协议
1. HTTPS协议代码:
public void start() {
bootstrap = new ServerBootstrap();
bootstrap.channel(NioserverSocketChannel.class);
bootstrap.option(NioChannelOption.SO_BACKLOG, RestfulParam.NIO_SO_BACKLOG_LENGTH);
bootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
businessGroup = new UnorderedThreadPoolEventExecutor(200, new DefaultThreadFactory("business"));
try {
bootstrap.group(bossGroup, workGroup);
// TODO 稍后分析此部的作用
SslContext sslContext = ContextSslFactory.setJksToNetty();
metricsHandler = new MetricsHandler();
bootstrap.childHandler(new ChannelInitializer<
NioSocketChannel>
() {
@Override
protected void initChannel(NioSocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("metricHandler", metricsHandler);
pipeline.addLast("idleHandler", new ServerIdleCheckHandler());
pipeline.addLast("ssl", sslContext.newHandler(ch.alloc()));
pipeline.addLast("decoder", new HttpRequestDecoder(4096, 8192, 2097152));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(2097152));
pipeline.addLast(businessGroup, new HttpRequestHandler());
}
});
channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
} catch (Exception exception) {
LOGGER.error("set jks to netty error!!!", exception);
}
}
同样的,HttpRequestHandler是具体的业务处理实现:
public class HttpRequestHandler extends SimpleChannelInboundHandler<
FullHttpRequest>
{
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
String uri = request.uri();
// 创建http响应
FullHttpResponse response;
if (sourceConfig.getActiveApis().contains(uri)) {
// 具体业务处理
requestHandlerMap.get(uri).logMsgReceived(request, ctx);
response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
} else {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
}
// 设置头信息
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;
charset=UTF-8");
// 状态回写到客户端
if (ctx.channel().isActive() &
&
ctx.channel().isWritable()) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// ......
}
}
【Netty-UDP与HTTPS协议】服务端和UDP协议的是同样的套路,只是接收数据的类型不一样。
2. 额外小知识:
对于HTTPS协议来说,通常会要求进行认证,因为客户端对于服务端来说是否可信,很重要,所以客户端需要有服务端提供的证书,来保证自己发送的请求对于服务端来说是可信的,然后才会被服务端接收。
所以会在Chandler中加了:pipeline.addLast(" ssl" , sslContext.newHandler(ch.alloc()));
SslContext sslContext = ContextSslFactory.setJksToNetty();
至于这里面具体做了些什么,主要就是读取证书,用来校验客户端连接的。
public static SslContext setJksToNetty() throws Exception {
LOGGER.info("Begin to set JKS to netty !");
SslContext sslContext;
String normalPath = Normalizer.normalize(“身份证书路径”, Normalizer.Form.NFKC);
String jksCanonicalPath = new File(normalPath).getCanonicalPath();
try (InputStream is = FileUtils.openInputStream(new File(jksCanonicalPath))) {
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(is, “证书密码”);
KeyManagerFactory managerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
managerFactory.init(keyStore, pwd.toCharArray());
sslContext = SslContextBuilder.forServer(managerFactory)
.protocols("TLSv1.2")
.trustManager(new File(“信任证书路径”))
.clientAuth(ClientAuth.REQUIRE)
// 加密协议 .ciphers("TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256")
.build();
}
return sslContext;
}
3. 如何测试 ?
这时候就可以用jmeter来上报数据了,大概配置就是:
文章图片
对于用了证书的jmeter该怎么配置呢?
区别在于启动方式,不用证书的情况下可以直接使用jmeter.bat启动,使用证书的情况下,启动为:
jmeter -Djavax.net.ssl.trustStore=D:\\DailyDev\\jmeter\\Network_Product_CA.cer -Djavax.net.ssl.keyStorePassword=密码 -Djavax.net.ssl.keyStore=D:\\DailyDev\\jmeter\\keystore.jks
当然也可以在jmeter.properties里面配置好证书路径和密码,然后直接启动:
文章图片
三、总结
其实关于netty的常用代码大概就是这么多了,但是关于netty的底层知识,还是有很多内容的,毕竟很强大,不然也不会有那么多三方件都用netty来进行底层的网络交互了,像kafka等那些,在引入jar的时候,都可以看到里面包含了netty的相关依赖。
推荐阅读
- super-vlan 和 mux-vlan
- Spark的两种核心Shuffle详解
- NAT原理总结iptables实现SNAT和DNAT。
- Java split函数的坑
- kafka 和zookeeper集群配置参考
- 吃透方法内联
- 戴尔台式机3020安装 Centos7
- 面试题 -- 如何设计一个线程池
- filebeat+logstash顺序存储日志