Netty应用


目前网上我没有看到让我觉得满意的文章,netty实战这本书很多东西也是一笔带过,并没有做详细的解释,很多东西只有靠自己测试,本文不会详细的介绍各种概念,建议先看netty实战。本文的目的是解释我自己在学习过程的一些疑惑。
Netty主要包含channel, 回调,Future,事件驱动及channelhandler.
回调的概念就是当你触发了某个事件,然后就自动调用相应的方法,比如JAVA SWING, 当你点击某个按钮,那就触发了按钮的事件,然后去调用相应的方法。
Future属于多线程里面的,线程启动然后执行,你并不能控制它什么时候结束,比如A线程启动,计算一个结果,主函数main希望获得A线程的结果,你可以通过Thread.join方式等待A执行完,再执行主方法,这种方式效率太低,因此出现了future,把线程放在future task里面执行,主函数继续运行,然后通过future.get方式获得结果,如果线程还没有执行完,会堵塞,直到完成。如果已经完成,那么通过get直接获得结果。
事件驱动就是触发了某个事件,这个和回调相结合,比如channelactive方法,当客户端连接到服务端,就会触发建立一条通道channel的事件, 这个时候channelactive就被调用了。
channelHandler顾名思义是拦截器,用来处理通道的数据的东西, channelhandler分in和out,对于客户端来说,进入客户端的数据就叫入站,数据传输到服务端就叫出站,对于服务端来说概念一样。
先来看一个示例:客户端连接到服务端发送Netty Rocks, 服务端收到信息打印出来,再发送welcome client给客户端
ServerHandler

package com.isesol.log.log; import java.net.InetAddress; import java.util.ArrayList; import java.util.Date; import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; @Sharable public class ServerHandler extends SimpleChannelInboundHandler { @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // TODO Auto-generated method stubSystem.out.println(msg.toString(CharsetUtil.UTF_8)); ChannelFuture future = ctx.write(Unpooled.copiedBuffer("welcome client!",CharsetUtil.UTF_8)); // ChannelFuture future = ctx.write(Unpooled.copiedBuffer("welcomee Client", CharsetUtil.UTF_8)); }}

Server:
package com.isesol.log.log; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class Server { private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); public static void main(String[] args) throws InterruptedException {//服务端有2个EventLoopGroup, 一个服务于自己,一个服务客户端, 也可以只用一个EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //指定channel格式 .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer() {@Override public void initChannel(SocketChannel ch) throws Exception { //ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); // ch.pipeline().addLast(DECODER); // ch.pipeline().addLast(ENCODER); // ch.pipeline().addLast(new ServerHandler2()); //添加了一个handler,用来处理通道数据 ch.pipeline().addLast(new ServerHandler()); }}); b.bind(8092).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}

ClientHandler
package com.isesol.log.log; import java.net.InetAddress; import java.nio.charset.Charset; import java.util.Date; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.util.CharsetUtil; @Sharable public class ClientHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.write(Unpooled.copiedBuffer("Netty Rocks", CharsetUtil.UTF_8)); ctx.flush(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // TODO Auto-generated method stubSystem.out.println(msg.toString(CharsetUtil.UTF_8)); }}

Client:
package com.isesol.log.log; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; public class Client { private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); public static void main(String[] args) throws InterruptedException, IOException {EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Override public void initChannel(SocketChannel ch) throws Exception { // ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); // ch.pipeline().addLast(DECODER); //ch.pipeline().addLast(ENCODER); ch.pipeline().addLast(new ClientHandler()); }}); ; // Start the connection attempt. Channel ch = b.connect("127.0.0.1", 8092).sync().channel(); // Read commands from the stdin. ChannelFuture lastWriteFuture = null; BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); for (; ; ) { String line = in.readLine(); if (line == null) { break; }// Sends the received line to the server. lastWriteFuture = ch.writeAndFlush(Unpooled.copiedBuffer(line,CharsetUtil.UTF_8)); // If user typed the 'bye' command, wait until the server closes // the connection. if ("bye".equals(line.toLowerCase())) { ch.closeFuture().sync(); break; } }// Wait until all messages are flushed before closing the channel. if (lastWriteFuture != null) { lastWriteFuture.sync(); } } finally { group.shutdownGracefully(); } }}

然后打包成JAR,执行结果如下:
[root@datanode01-ucloud ~]# hadoop jar server.jar 18/08/21 01:38:50 INFO logging.LoggingHandler: [id: 0x4d2022aa] REGISTERED 18/08/21 01:38:50 INFO logging.LoggingHandler: [id: 0x4d2022aa] BIND(0.0.0.0/0.0.0.0:8092) 18/08/21 01:38:50 INFO logging.LoggingHandler: [id: 0x4d2022aa, /0.0.0.0:8092] ACTIVE 18/08/21 01:38:55 INFO logging.LoggingHandler: [id: 0x4d2022aa, /0.0.0.0:8092] RECEIVED: [id: 0xdd0eafe7, /127.0.0.1:36936 => /127.0.0.1:8092] its going to handler 1 Netty Rocks

[root@datanode01-ucloud ~]# hadoop jar client.jar welcome client!

【Netty应用】ChannelPipeline 和ChannelHandler的关系:
服务端与客户端连接之后,会建立一个通道channel, 数据的传输就是通过ChannelPipeline, ChannelHandler就是挂在ChannelPipeline拦截器,用来处理数据,分入站和出站。你可以在ChannelPipeline上挂无数个ChannelHandler,也可以一个不挂。把ChannelPipeline想象成一条水管,然后水管上有很多水龙头,水缓缓的从水管流入或者流出,每个水龙头作为一个拦截器,用来处理这些水。
Netty应用
文章图片

inboundhandler的执行顺序是从左到右,outbound是从右到左,在添加handler到pipeline的时候,顺序非常重要。下面是先执行ENcoder, 再执行DiscardHandle。
ch.pipeline() .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new DiscardHandle());

outboundhandler的执行顺序相反,先从尾端处理,假设上面的2个handler属于outbound,那么处理顺序是DiscardHandle再StringEncoder.
最简单的理解就是 inbound 是从头到尾,outbound是从尾到头,addlast的意思添加到最后的意思,上面添加了2次,很显然stringencoder在前面,而对于outbound来说,DiscardHandler是最后,属于尾端,先执行。

Encoder和Deconder:
网络数据传输的单位是字节,ByteBuf是数据的容器,如果你发送的数据本身就是字节类型,那么是不需要encoder和decoder的,从上面的示例可以看到,我并没有添加任何encoder和decoder channelhandler,但是当你发送的数据类型不是字节,那么就一定要使用encoder和decoder, 否者服务端或者客户端是无法读取你的数据。网络传输还涉及到半包,粘包的概念,这个建议参考专门的文章。
@Sharable public class DiscardClient extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } }

上面的SimpleChannelInboundHandler表示数据传输是ByteBuf,因此在ctx.writeAndFlush的时候以ByteBuf的方式发送数据。现在我来修改成以String发送数据,这个时候你就会需要Encoder/Decoder,encoder的概念是把Sting包装成 ByteBuf再发送, Decoder接受到ByteBuf再解码成String. 如果没有这2个动作,你是无法发送数据和接受数据的,上面已经解释了,网络数据传输一定是字节。
修改inbound为String,发送数据的时候直接以String发送:
package com.isesol.elasticsearch; import java.nio.charset.Charset; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import io.netty.channel.ChannelHandler.Sharable; @Sharable public class DiscardHandle extends SimpleChannelInboundHandler { @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { // TODO Auto-generated method stub System.out.println(msg); ctx.writeAndFlush(msg); }}

那么在添加handler的时候就必须要添加encoder:
try{ ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss,worker).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new DiscardHandle()); } });

发送数据的时候先通过encoder把 String封装成ByteBuf,Decoder把接受进来的ByteBuf解码成String再打印。
去掉encoder handler来看一下实际结果,服务端收到了Netty rocks, 但是客户端什么也没有输出,按照规则应该输出服务端发送的数据,这就是因为没有encoder string为ByteBuf,导致数据根本发不了:
[root@datanode01-ucloud ~]# [root@datanode01-ucloud ~]# hadoop jar service.jar 18/08/21 10:52:58 INFO logging.LoggingHandler: [id: 0x6022f658] REGISTERED 18/08/21 10:52:58 INFO logging.LoggingHandler: [id: 0x6022f658] BIND(/127.0.0.1:8888) 18/08/21 10:52:58 INFO logging.LoggingHandler: [id: 0x6022f658, /127.0.0.1:8888] ACTIVE 18/08/21 10:53:01 INFO logging.LoggingHandler: [id: 0x6022f658, /127.0.0.1:8888] RECEIVED: [id: 0x703db179, /127.0.0.1:56146 => /127.0.0.1:8888] Netty rocks!

^C[root@datanode01-ucloud ~]# hadoop jar client.jar

疑问1:之前不是说inbound会按照添加的handler顺序处理数据吗?那我通过DiscardHandler发送数据,怎么还会执行Encoder ? 顺序明明是 encoder, decoder, discardhandler.
为了证明到底是否执行了encoder, decoder,我们需要做一个实验,StringEncoder, StringDecoder是netty提供的,我们把它们拷贝到自己的类中,然后在构造函数添加一个打印,一旦调用了这2个handler,就肯定会打印数据。 这2个类名字取为myStringEncoder, myStringDecoder, 测试2种情况,只发不收和只收不发,这样才知道整个过程是怎么样:
package com.isesol.elasticsearch; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.MessageToMessageEncoder; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.util.List; @Sharable public class myStringEncoder extends MessageToMessageEncoder{ // TODO Use CharsetEncoder instead. private final Charset charset; /** * Creates a new instance with the current system character set. */ public myStringEncoder() { this(Charset.defaultCharset()); }/** * Creates a new instance with the specified character set. */ public myStringEncoder(Charset charset) {System.out.println("invoker encoder"); if (charset == null) { throw new NullPointerException("charset"); } this.charset = charset; }@Override protected void encode(ChannelHandlerContext ctx, CharSequence msg, List out) throws Exception { if (msg.length() == 0) { return; }System.out.println("start to encoder!!"); out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset)); }}
package com.isesol.elasticsearch; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.MessageToMessageDecoder; import java.nio.charset.Charset; import java.util.List; public class myStringDecoder extends MessageToMessageDecoder {private final Charset charset; /** * Creates a new instance with the current system character set. */ public myStringDecoder() { this(Charset.defaultCharset()); }/** * Creates a new instance with the specified character set. */ public myStringDecoder(Charset charset) {System.out.println("Decoder invoke"); if (charset == null) { throw new NullPointerException("charset"); } this.charset = charset; }@Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { System.out.println("start to decoder!!"); out.add(msg.toString(charset)); } }
try{ ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss,worker).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new myStringEncoder(CharsetUtil.UTF_8)) .addLast(new myStringDecoder(CharsetUtil.UTF_8)) .addLast(new DiscardHandle()); } });

只发不收:
[root@datanode01-ucloud ~]# hadoop jar service.jar 18/08/21 11:58:39 INFO logging.LoggingHandler: [id: 0xa212be6f] REGISTERED 18/08/21 11:58:39 INFO logging.LoggingHandler: [id: 0xa212be6f] BIND(/127.0.0.1:8888) 18/08/21 11:58:39 INFO logging.LoggingHandler: [id: 0xa212be6f, /127.0.0.1:8888] ACTIVE 18/08/21 11:58:43 INFO logging.LoggingHandler: [id: 0xa212be6f, /127.0.0.1:8888] RECEIVED: [id: 0x5a6bb294, /127.0.0.1:33552 => /127.0.0.1:8888] invoker encoder Decoder invoke DiscardHandler invoke start to encoder!!

^C[root@datanode01-ucloud ~]# hadoop jar client.jar Client received: this is from SERVER

从server端的调用顺序先encoder,再decoder,再执行DiscardHandler,发送的时候执行封装(start to encoder)。注意一点,虽然调用了decoder handler,但实际上没有去执行decoder的解码动作。
只收不发:
[root@datanode01-ucloud ~]# hadoop jar service.jar 18/08/21 12:06:15 INFO logging.LoggingHandler: [id: 0xb7f24453] REGISTERED 18/08/21 12:06:15 INFO logging.LoggingHandler: [id: 0xb7f24453] BIND(/127.0.0.1:8888) 18/08/21 12:06:15 INFO logging.LoggingHandler: [id: 0xb7f24453, /127.0.0.1:8888] ACTIVE 18/08/21 12:06:18 INFO logging.LoggingHandler: [id: 0xb7f24453, /127.0.0.1:8888] RECEIVED: [id: 0x86bc1f79, /127.0.0.1:37392 => /127.0.0.1:8888] invoker encoder Decoder invoke DiscardHandler invoke start to decoder!! Netty rocks!

调用顺序还是保持不变,最后执行decoder解码。
最后一个疑问,发数据的时候虽然掉用了decoder,但实际没有执行解码动作,收数据调用了encoder,也没有进行编码动作,这是为什么? 谁在背后操纵? 个人理解调用handler的时候会全部启动,但是至于执行不执行看实际逻辑,如果是进来的数据,很显然不需要encoder,那么就不会去执行。
Encode from one message to an other. This method will be called for each written message that can be handled by this encoder.

疑问2:之前我们提过入站inbound ,出站为outbound, 我之前以为出去的数据必须经过 outbound,但是上面的示例中并没有添加outbound, 数据照样能够发送, 网上也有人问inbound 和outbound到底有什么区别?
在上面的基本概念中我提过,pipeline上可以挂无数个handler,也可以一个不挂。 handler仅仅是拦截器,需要就挂上去,不需要就不挂,发送和接收数据与handler毫无关系。就好比水管上的水龙头,水要流入或者流出,与水龙头本身没有关系,仅仅是你需要去控制水流的时候才把水龙头挂到水管上。
我们来建立一个outbound,看看添加了outbound会如何?
package com.isesol.elasticsearch; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; @Sharable public class DiscardOutHandler extends ChannelOutboundHandlerAdapter { @Override public void read(ChannelHandlerContext ctx) throws Exception {System.out.println("DiscardOutHandler has been invoker!!"); ctx.read(); }@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.writeAndFlush((ByteBuf) msg, promise); }}

try { final Bootstrap b = new Bootstrap(); b.group(boss).channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new DiscardClient()) .addLast(new DiscardOutHandler()); }});

测试结果:
[root@datanode01-ucloud ~]# hadoop jar service.jar 18/08/21 12:25:54 INFO logging.LoggingHandler: [id: 0x9f697a1d] REGISTERED 18/08/21 12:25:54 INFO logging.LoggingHandler: [id: 0x9f697a1d] BIND(/127.0.0.1:8888) 18/08/21 12:25:54 INFO logging.LoggingHandler: [id: 0x9f697a1d, /127.0.0.1:8888] ACTIVE 18/08/21 12:25:59 INFO logging.LoggingHandler: [id: 0x9f697a1d, /127.0.0.1:8888] RECEIVED: [id: 0x20c8a8aa, /127.0.0.1:47458 => /127.0.0.1:8888] invoker encoder Decoder invoke DiscardHandler invoke start to decoder!! Netty rocks!

[root@datanode01-ucloud ~]# [root@datanode01-ucloud ~]# hadoop jar client.jar DiscardOutHandler has been invoker!!

从上面图片看到,添加了outbound的后,确实调用了,只不过我这个outbound没有做任何处理,仅仅是ctx.read, 仅仅是forware to the next ChannelOutboundler。看具体含义;
Calls ChannelHandlerContext.read() to forward to the next ChannelOutboundHandler in the ChannelPipeline. Sub-classes may override this method to change behavior.

我们再来做一个实验,客户端在inbound里面执行ctx.writeandflush,发送Netty rocks经过了outbound再出去,我们在outbound来修改要发送的数据,修改了2个方法:
package com.isesol.elasticsearch; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.util.CharsetUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; @Sharable public class DiscardOutHandler extends ChannelOutboundHandlerAdapter { @Override public void read(ChannelHandlerContext ctx) throws Exception {System.out.println("DiscardOutHandler has been invoker!!"); ctx.writeAndFlush(Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8)); }@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(Unpooled.copiedBuffer("dogshit!!", CharsetUtil.UTF_8), promise); }}

实验结果,服务端不仅收到了Netty rocks, 还收到了outbound 发送的hello world:
[root@datanode01-ucloud ~]# hadoop jar service.jar 18/08/21 12:49:31 INFO logging.LoggingHandler: [id: 0x53b17dcf] REGISTERED 18/08/21 12:49:31 INFO logging.LoggingHandler: [id: 0x53b17dcf] BIND(/127.0.0.1:8888) 18/08/21 12:49:31 INFO logging.LoggingHandler: [id: 0x53b17dcf, /127.0.0.1:8888] ACTIVE 18/08/21 12:49:36 INFO logging.LoggingHandler: [id: 0x53b17dcf, /127.0.0.1:8888] RECEIVED: [id: 0x60c153f4, /127.0.0.1:59732 => /127.0.0.1:8888] invoker encoder Decoder invoke DiscardHandler invoke start to decoder!! Netty rocks!hello world

[root@datanode01-ucloud ~]# hadoop jar client.jar DiscardOutHandler has been invoker!!

上面已经介绍了数据传输,编码解码,channelhandler,channelpipeline等主要的netty结构。还有一个future没有讲,我们随便在服务端或者客户端添加一个listener,用来监听动作是否成功失败。修改DiscardClient:
package com.isesol.elasticsearch; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; @Sharable public class DiscardClient extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)).addListener(listener); } @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); ctx.close(); }private final ChannelFutureListener listener = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { // TODO Auto-generated method stub if (future.isSuccess()) { System.out.println("send is success!"); } else { future.cause().printStackTrace(); future.channel().close(); } } }; }

结果如下, 发送数据成功之后,会调用listener的方法,打印send is success:
[root@datanode01-ucloud ~]# hadoop jar client.jar send is success! DiscardOutHandler has been invoker!!

现在我故意发送失败,上面发送的数据格式应该为ByteBuf,然后我们来发送String。
[root@datanode01-ucloud ~]# hadoop jar client.jar send is failed java.lang.UnsupportedOperationException: unsupported message type: String (expected: ByteBuf, FileRegion) at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:281) at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:707) at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:691) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:681) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:716) at com.isesol.elasticsearch.DiscardClient.channelActive(DiscardClient.java:18) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:169) at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:817) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:260) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:290) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) DiscardOutHandler has been invoker!!


    推荐阅读