Netty|Netty 示例6 集成Protobuf

【Netty|Netty 示例6 集成Protobuf】.proto文件
$ protoc --java_out=src/main/java src/protobuf/test.proto

syntax = 'proto2'; package org.baozi.rpc.protobuf; option optimize_for = SPEED; option java_package = "org.baozi.netty.test006"; option java_outer_classname = "MyDataInfo"; message Person { required string name = 1; optional int32 age = 2; optional string address = 3; }

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class TestServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new TestServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; public class TestServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new TestServerHandler()); } }

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class TestServerHandler extends SimpleChannelInboundHandler {@Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception { System.out.println(msg.getName()); System.out.println(msg.getAge()); System.out.println(msg.getAddress()); } }

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class TestClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new TestClientInitializer()); ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync(); channelFuture.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } } }

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; public class TestClientInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new TestClientHandler()); } }

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class TestClientHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { MyDataInfo.Person person = MyDataInfo.Person.newBuilder() .setName("张三").setAge(20).setAddress("北京").build(); // 在TestClientInitializer自定义的Handler中,有编码解码(序列化和反序列化 ctx.channel().writeAndFlush(person); }@Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {} }

多协议消息支援
syntax = 'proto2'; package org.baozi.rpc.protobuf; option optimize_for = SPEED; option java_package = "org.baozi.netty.test006"; option java_outer_classname = "MyDataInfo"; // 多消息的思想就是:一个枚举标识一个Messagemessage MyMessage {enum DataType{ PersontType = 1; DogType = 2; CatType = 3; }// 指定DataType中的其中一个 required DataType data_type = 1; // oneof 只会保留其中一个 oneof dataBody { Person person = 2; Dog dog = 3; Cat cat = 4; } }// 多个消息message Person { optional string name = 1; optional int32 age = 2; optional string address = 3; }message Dog { optional string name = 1; optional int32 age = 2; }message Cat { optional string name = 1; optional string city = 2; }

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; public class TestClientInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new TestClientHandler()); } }

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.Random; public class TestClientHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { int randomInt = new Random().nextInt(3); // 模拟 MyDataInfo.MyMessage myMessage = null; if (0 == randomInt) { myMessage = MyDataInfo.MyMessage.newBuilder() // 设置MyMessage中data_type字段 .setDataType(MyDataInfo.MyMessage.DataType.PersontType) // 设置MyMessage中oneof里其中之一个message .setPerson(MyDataInfo.Person.newBuilder() .setName("张三").setAge(20).setAddress("北京").build()) .build(); } else if (1 == randomInt) { myMessage = MyDataInfo.MyMessage.newBuilder() .setDataType(MyDataInfo.MyMessage.DataType.DogType) .setDog(MyDataInfo.Dog.newBuilder() .setName("一只狗").setAge(2).build()) .build(); } else { myMessage = MyDataInfo.MyMessage.newBuilder() .setDataType(MyDataInfo.MyMessage.DataType.PersontType) .setCat(MyDataInfo.Cat.newBuilder() .setName("一只猫").setCity("上海").build()) .build(); }ctx.channel().writeAndFlush(myMessage); }@Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {} }

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; public class TestServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new TestServerHandler()); } }

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class TestServerHandler extends SimpleChannelInboundHandler {@Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception { MyDataInfo.MyMessage.DataType dataType = msg.getDataType(); if (dataType == MyDataInfo.MyMessage.DataType.PersontType) { MyDataInfo.Person person = msg.getPerson(); System.out.println(person.getName()); System.out.println(person.getAge()); System.out.println(person.getAddress()); } else if (dataType == MyDataInfo.MyMessage.DataType.DogType) { MyDataInfo.Dog dog = msg.getDog(); System.out.println(dog.getName()); System.out.println(dog.getAge()); } else { MyDataInfo.Cat dog = msg.getCat(); System.out.println(dog.getName()); System.out.println(dog.getCity()); }} }

    推荐阅读