Netty|Netty 示例6 集成Protobuf
【Netty|Netty 示例6 集成Protobuf】.proto文件
$ protoc --java_out=src/main/java src/protobuf/test.proto
syntax = 'proto2';
package org.baozi.rpc.protobuf;
option optimize_for = SPEED;
option java_package = "org.baozi.netty.test006";
option java_outer_classname = "MyDataInfo";
message Person {
required string name = 1;
optional int32 age = 2;
optional string address = 3;
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class TestServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestServerInitializer extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TestServerHandler extends SimpleChannelInboundHandler {@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {
System.out.println(msg.getName());
System.out.println(msg.getAge());
System.out.println(msg.getAddress());
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TestClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestClientInitializer extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestClientHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TestClientHandler extends SimpleChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MyDataInfo.Person person = MyDataInfo.Person.newBuilder()
.setName("张三").setAge(20).setAddress("北京").build();
// 在TestClientInitializer自定义的Handler中,有编码解码(序列化和反序列化
ctx.channel().writeAndFlush(person);
}@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {}
}
多协议消息支援
syntax = 'proto2';
package org.baozi.rpc.protobuf;
option optimize_for = SPEED;
option java_package = "org.baozi.netty.test006";
option java_outer_classname = "MyDataInfo";
// 多消息的思想就是:一个枚举标识一个Messagemessage MyMessage {enum DataType{
PersontType = 1;
DogType = 2;
CatType = 3;
}// 指定DataType中的其中一个
required DataType data_type = 1;
// oneof 只会保留其中一个
oneof dataBody {
Person person = 2;
Dog dog = 3;
Cat cat = 4;
}
}// 多个消息message Person {
optional string name = 1;
optional int32 age = 2;
optional string address = 3;
}message Dog {
optional string name = 1;
optional int32 age = 2;
}message Cat {
optional string name = 1;
optional string city = 2;
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestClientInitializer extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestClientHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Random;
public class TestClientHandler extends SimpleChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
int randomInt = new Random().nextInt(3);
// 模拟
MyDataInfo.MyMessage myMessage = null;
if (0 == randomInt) {
myMessage = MyDataInfo.MyMessage.newBuilder()
// 设置MyMessage中data_type字段
.setDataType(MyDataInfo.MyMessage.DataType.PersontType)
// 设置MyMessage中oneof里其中之一个message
.setPerson(MyDataInfo.Person.newBuilder()
.setName("张三").setAge(20).setAddress("北京").build())
.build();
} else if (1 == randomInt) {
myMessage = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DataType.DogType)
.setDog(MyDataInfo.Dog.newBuilder()
.setName("一只狗").setAge(2).build())
.build();
} else {
myMessage = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DataType.PersontType)
.setCat(MyDataInfo.Cat.newBuilder()
.setName("一只猫").setCity("上海").build())
.build();
}ctx.channel().writeAndFlush(myMessage);
}@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestServerInitializer extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TestServerHandler extends SimpleChannelInboundHandler {@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if (dataType == MyDataInfo.MyMessage.DataType.PersontType) {
MyDataInfo.Person person = msg.getPerson();
System.out.println(person.getName());
System.out.println(person.getAge());
System.out.println(person.getAddress());
} else if (dataType == MyDataInfo.MyMessage.DataType.DogType) {
MyDataInfo.Dog dog = msg.getDog();
System.out.println(dog.getName());
System.out.println(dog.getAge());
} else {
MyDataInfo.Cat dog = msg.getCat();
System.out.println(dog.getName());
System.out.println(dog.getCity());
}}
}
推荐阅读
- Activiti(一)SpringBoot2集成Activiti6
- 逻辑回归的理解与python示例
- 私有化轻量级持续集成部署方案--03-部署web服务(下)
- Spring集成|Spring集成 Mina
- Spring|Spring Cloud Feign实现文件上传下载的示例代码
- C语言进阶栈帧示例详解教程
- OpenCV|OpenCV-Python实战(18)——深度学习简介与入门示例
- Python机器学习基础与进阶|Python机器学习--集成学习算法--XGBoost算法
- c#中task与thread区别及其使用的方法示例
- locust实例