《Netty学习打卡--从小白到放弃》----- 15 - netty 之grpc Simple RPC 例子

打卡日期(2019-07-17)
学习要点

-1.StreamObserver -2.ManagedChannel -3.ManagedChannelBuilder -4.Runtime -5.

1.StreamObserver 【《Netty学习打卡--从小白到放弃》----- 15 - netty 之grpc Simple RPC 例子】通过StreamObserver的匿名类来处理消息的返回。
  • onNext(obj) : 将下一个要返回的消息存入,包装返回的消息
  • onCompleted():所有的消息已经存入,关闭本次连接
2.ManagedChannel 是客户端最核心的类,它表示逻辑上的一个channel。底层持有一个物理的transport(TCP通道),并负责维护transport的活性。
在RPC调用的任何时机,如果检测到底层的transport处于关闭状态(terminated),将会尝试重新创建transport。
通常情况下我们不需要再RPC调用结束之后就关闭Channel,Channel可以一直被重用,知道Client不在需要或者Client出现异常而无法使用。为了提高Client的应用的整体并发能力,我们可以提供连接池模式,创建多个ManagedChannel,使用轮询、随机等算法,每次请求RPC的时候选择一个Channel即可。
3.ManagedChannelBuilder ManagedChannelBuilder用来创建客户端的channel,使用了provider机制,具体创建哪种channel由provider决定。
利用grpc完成一个简单的Simple RPC列子
步骤
  • 1.配置grpc依赖包
  • 2.编写proto文件
  • 3.利用gradle generateProto生成java类
  • 4.根据官网的demo,编写Server/Client服务
1.配置grpc依赖包 官网demo
根据官网的例子,一步步配置grpc依赖的包
group 'com.grpc.gradle.study' version '1.0-SNAPSHOT'apply plugin: 'java' apply plugin: 'com.google.protobuf'sourceCompatibility = 1.8 targetCompatibility = 1.8repositories { mavenCentral() }dependencies { compile ( "junit:junit:4.12", "io.netty:netty-all:4.1.37.Final", //netty依赖 "io.grpc:grpc-netty-shaded:1.22.1", //protobuf依赖 "io.grpc:grpc-protobuf:1.22.1", //grpc stub桩依赖 "io.grpc:grpc-stub:1.22.1" ) }buildscript { repositories { mavenCentral() } dependencies { classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.8' } } protobuf { protoc { artifact = "com.google.protobuf:protoc:3.7.1" } plugins { grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.22.1' } } generateProtoTasks { all()*.plugins { grpc {} } } }

需要着重说明一下 proto文件默认放在 src/main/proto 或 src/test/proto路径下面
2.编写proto文件
syntax = "proto3"; package study; option java_package = "com.dragon.study"; option java_outer_classname = "Student"; option optimize_for = SPEED; option java_multiple_files = true; message StudentRequest{ string username = 1; }message StudentResponse{ string realname = 1; }service StudentService{ rpc GetRealNameByUserName(StudentRequest) returns (StudentResponse) {} }

3.利用gradle generateProto生成java类
运行命令: gradle generateProto

4.根据官网的demo,编写Server/Client服务 Server
package com.dragon.study.server; import io.grpc.Server; import io.grpc.ServerBuilder; import java.io.IOException; import java.util.logging.Logger; /** * @Auther: lijianlong * @Date: 2019/7/17 15:04 * @Description: */ public class StudentServer { private static final Logger logger = Logger.getLogger(StudentServer.class.getName()); // grpc 服务启动项 private Server server; private final int port = 8080; public static void main(String[] args) throws IOException, InterruptedException { final StudentServer server = new StudentServer(); server.start(); //为了阻塞服务,使得服务器一直启动状态 server.blockUntilShutDown(); }//启动服务器方法 private void start() throws IOException { server = ServerBuilder.forPort(port) .addService(new StudentServiceImpl()) .build() .start(); logger.info("project start , listening on" + port); // 回调钩子,在jvm关闭之前,通过钩子完成一些收尾的工作 /*** * addShutdownHook 注册一个新的虚拟机关闭的钩子,java虚拟机关闭是为了响应两种事件 * 1.程序正常的退出,当最后一个线程退出,或者当exit()被调用的时候回调用 * 2.为了响应用户的中断,或者系统的关闭。例如:服务挂掉等 * addShutdownHook关闭的钩子就是一种初始化但尚未开启的线程 */ Runtime.getRuntime().addShutdownHook(new Thread(()->{ System.out.println("关闭jvm"); StudentServer.this.stop(); })); }private void stop(){ System.out.println("执行关闭服务"); if(null != server){ server.shutdown(); } }// 阻塞服务,不让服务退出 private void blockUntilShutDown() throws InterruptedException { if(null != server){ /*** * awaitTermination 利用Synchronized 来保证锁的安全性 */ server.awaitTermination(); } } }

package com.dragon.study.server; import io.grpc.Server; import io.grpc.ServerBuilder; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; /** * @Auther: lijianlong * @Date: 2019/7/17 15:04 * @Description: */ public class StudentServer { private static final Logger logger = Logger.getLogger(StudentServer.class.getName()); // grpc 服务启动项 private Server server; private final int port = 8080; public static void main(String[] args) throws IOException, InterruptedException { final StudentServer server = new StudentServer(); server.start(); server.blockUntilShutDown(); }//启动服务器方法 private void start() throws IOException { server = ServerBuilder.forPort(port) .addService(new StudentServiceImpl()) .build() .start(); logger.info("project start , listening on" + port); // 回调钩子,在jvm关闭之前,通过钩子完成一些收尾的工作 Runtime.getRuntime().addShutdownHook(new Thread(()->{ System.out.println("关闭jvm"); StudentServer.this.stop(); })); }private void stop(){ System.out.println("执行关闭服务"); if(null != server){ server.shutdown(); } }// 阻塞服务,不让服务退出 private void blockUntilShutDown() throws InterruptedException { if(null != server){ server.awaitTermination(); } } }

Client
package com.dragon.study.client; import com.dragon.study.StudentRequest; import com.dragon.study.StudentResponse; import com.dragon.study.StudentServiceGrpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; public class StudentClient { private static final Logger logger = Logger.getLogger(StudentClient.class.getName()); //channel相当于一个连接,客户端核心类 private final ManagedChannel channel; private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub; public StudentClient(String host , int port){ //ManagedChannelBuilder 管理客户端的链接,用来创建链接 this(ManagedChannelBuilder.forAddress(host,port).usePlaintext().build()); }public StudentClient(ManagedChannel channel) { this.channel = channel; blockingStub = StudentServiceGrpc.newBlockingStub(channel); }public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(100,TimeUnit.SECONDS); }public void getRealName(String name){ StudentRequest request = StudentRequest.newBuilder().setUsername(name).build(); StudentResponse response; response = blockingStub.getRealNameByUserName(request); System.out.println(response.getRealname()); }public static void main(String[] args) throws InterruptedException { StudentClient client = new StudentClient("localhost",8080); try{ String username = "张三"; if(args.length > 0){ username = args[0]; } client.getRealName(username); }finally { client.shutdown(); }}}

    推荐阅读