netty-grpc四种方法

single-single protobuf

rpc searchStudent(Request) returns(Student){}

handler
public void searchStudent(Message.Request request, StreamObserver responseObserver) { String param = request.getParam(); Message.Student student = Message.Student.newBuilder().setAddr("localhost").setId(99).setName(param).build(); responseObserver.onNext(student); responseObserver.onCompleted(); }

通过streamObserver进行返回。
onNext:添加返回值
onCimpleted:返回
Client
创建stub
ManagedChannel managedChannel = ManagedChannelBuilder .forAddress("localhost",8989) .usePlaintext() .build(); StudentServiceGrpc.StudentServiceBlockingStub stub = StudentServiceGrpc.newBlockingStub(managedChannel);

方法调用总需要这个,先提出来。
Message.Student student = stub.searchStudent( Message.Request.newBuilder() .setParam("godme").build() ); System.out.println(student.toString());

这调用最方便了,直接传参并获取返回即可。
single-stream protobuf
rpc addStudent(Request) returns(stream Student){}

入参一个,返回类型为stream Student
handler
public void addStudent(Message.Request request, StreamObserver responseObserver) { Integer count = Integer.valueOf(request.getParam()); while(count-- > 0){ responseObserver.onNext(Message.Student.newBuilder() .setAddr("localhost") .setName("godme") .setId(count) .build() ); } responseObserver.onCompleted(); }

还是先按照原来的理解吧
onNext:添加返回值
onCompleted:返回
嗯,啥都没变,不过可以多return几个值。
Client
Iterator it = stub.addStudent( Message.Request.newBuilder() .setParam("9") .build() ); while(it.hasNext()){ System.out.println(it.next().toString()+ "\n*****************************"); }

就和handler中理解的一样,可以有多个返回值。
在客户端一处,这些返回值会组合成一个iterator进行返回。
我们去便利并处理即可。
stream简单理解 onNext就好比write
onCompleted就好比flush
不过这类比的是channel中的流
这里的stream是概念的流,主要感觉是源源不断
同时自动给我们解包,拆分成单个对象。
stream-single protobuf
rpc countStudent(stream Request) returns( Request){}

handler
public StreamObserver countStudent(StreamObserver responseObserver) { return new StreamObserver() { private int count = 0; @Override public void onNext(Message.Request value) { System.out.println("receive request : " + value.getParam()); count += 1; } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onCompleted() { responseObserver.onNext( Message.Request.newBuilder() .setParam(String.valueOf(count)) .build() ); responseObserver.onCompleted(); } }; }

画风突变,最明显的就是有返回值了,而且代码更多了。简单梳理下:
  • StreamObserver
我们接受的请求参数没有单个的了,只有一个streamObserver
同时,我们的返回值也必须是一个observer,只能手动创建
  • 结构
observer中还是三个方法:
onNext:原来我们调用onNext是作为添加返回值,这里作为的是接受处理传过来的多个请求
我们最原始的stream理解就是多个数据,这个就相当于是迭代iterator请求参数进行处理。
因为我们请求参数是stream,也就是多个,需要单个进行处理。
这里我们只做记录请求个数的操作。
onError:错误处理回调方法
onCompleted:标识方法结束,此时把处理结果返回出去responseObserver.onNext
  • 归纳
    • 两个处理器
    把我们远程请求的方法看做一个处理器,远程去调用的时候就是执行处理器处理信息。
    此时,我们的onNext就是添加返回,onCompleted就是返回数据。
    我们返回的也当做一个处理器,onNext就是处理每一个数据,onCompleted就是全部处理完毕。
    • 异步调用
    当请求开始处理的时候,我们定义的方法,也就是第一个处理器被调用,但不是实时的执行了。
    后续会自己启动第二个处理器,处理完毕之后再操作,接上第一个处理器的onNext添加返回值。
client first_handler second_handler 请求 响应 异步处理 second_onNext遍历处理 second_Completed处理结束,调用first_onNext添加处理结果 first_Completed返回处理结果 client first_handler second_handler
我所理解的就是这样
首先,这是一个异步的处理,我们需要做两个事情
  1. 完善处理方法,也就是第一个handler
  2. 定制具体处理办法,第二个handler
也即是说,我们真的返回还是靠的入参的observerreturnobserver当做第二个handler
Client
StudentServiceGrpc.StudentServiceStub stub2 = StudentServiceGrpc.newStub(managedChannel); StreamObserver handlerStreamObserver= new StreamObserver(){ @Override public void onNext(Message.Request value) { System.out.println("result : " + value.getParam()); } @Override public void onCompleted() { System.out.println("stream - single run over"); } @Override public void onError(Throwable t) {} }; StreamObserverrequestStreamObserver= stub2.countStudent(handlerStreamObserver); int count = 8; while(count-->0){ requestStreamObserver.onNext(Message.Request.newBuilder().setParam(String.valueOf(count)).build()); } requestStreamObserver.onCompleted(); Thread.sleep(3000);

异步stub
StudentServiceGrpc.StudentServiceBlockingStub stub = StudentServiceGrpc.newBlockingStub(managedChannel); StudentServiceGrpc.StudentServiceStub stub2 = StudentServiceGrpc.newStub(managedChannel);

对比一下,第一个是阻塞的,第二个是异步的。
同步的stub找不到对照的方法,只能异步的。
handler
StreamObserver handlerStreamObserver= new StreamObserver(){ @Override public void onNext(Message.Request value) { System.out.println("result : " + value.getParam()); } @Override public void onCompleted() { System.out.println("stream - single run over"); } @Override public void onError(Throwable t) { } };

和服务端形成对比,我们要调用,也需要定义一个回调observer
调用
StreamObserverrequestStreamObserver= stub2.countStudent(handlerStreamObserver); int count = 8; while(count-->0){ requestStreamObserver.onNext(Message.Request.newBuilder().setParam(String.valueOf(count)).build()); } requestStreamObserver.onCompleted();

通过传入回调的observer,返回observer
然后onNext就可以一次性发起多个请求,onCompleted就标识结束了。
结束就结束了,没其他的了,具体处理由handler回调处理。
把手动发起的请求也算作handler
clientHandler2 clientHandler1 serverHandler1 serverHandler2 装载 请求 响应 异步处理 结果返回 异步接收 回调处理 clientHandler2 clientHandler1 serverHandler1 serverHandler2 我们一般操控的就是clientHandler1serverHandler1,直来直去的一次性处理。
即使是single-stream,也是一次请求,一次性解包处理。
即使不知道服务端返回多少个数据,但是一次性的iterator也能够一次性传输。
但是请求的话,第一个不知道有多少个,第二个问题就是不知道会延时多久。
一次性处理不仅不行,而且太慢,需要来一个处理一个,才有了异步。
这下,对于stream有了更深的感触:源源不断,不知几何,来一个处理一个,需异步。
归根结底,我们就是接收请求和处理请求异步分开了,具体的处理自动触发,不归我们管理。
所以netty中的bossGroupworkerGroup的角色这下也更深入了一下。
一个小坑
Thread.sleep(3000);

因为是异步的,所以需要等待一下。
要不请求有了响应,具体处理结果还没回来呢。
程序已经结束,还有什么回调呢。
stream-stream protobuf
rpc batchSearchStudent(stream Request) returns (stream Student){}

handler
public StreamObserver batchSearchStudent(StreamObserver responseObserver) { return new StreamObserver() { @Override public void onNext(Message.Request value) { responseObserver.onNext( Message.Student.newBuilder() .setId(Integer.valueOf(value.getParam())) .setName("godme") .setAddr("localhost") .build() ); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onCompleted() { responseObserver.onCompleted(); } }; }

综合理解一下
handler1-onNext:添加返回值
handler1-completed:表示处理结束,返回返回值
handler2-onNext:处理单个请求
handler2-onCompleted:表示处理结束
h2_onNext h1_onNext h2_onCompleted h1_onCompleted client 计算结果,并添加 执行完毕,并通知 通知返回数据 返回计算结果 h2_onNext h1_onNext h2_onCompleted h1_onCompleted client 抛开层级划分:
onNext:单个数据处理办法
onCompleted:数据全部处理完毕
只不过,handler2不由我们进行调用,它onNext完毕之后自动onComplted
但是我们可以自定义区掌控handler1的动作啊。
stream-single的时候,多个参数,最后只返回一个,所以我们只能一次handler1-onNext
stream-stream的时候,我们就需要多次的handler1-onNext了。
client
StreamObserver streamStreamObserver = new StreamObserver() { @Override public void onNext(Message.Student student) { System.out.println(student.toString() + "===stream==="); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onCompleted() { System.out.println("stream over"); } }; StreamObserver streamObserver = stub2.batchSearchStudent(streamStreamObserver); while(count++ < 4){ streamObserver.onNext(Message.Request.newBuilder().setParam(String.valueOf(count)).build()); } Thread.sleep(5000); }

single-stream处理办法一致。
小结 handler handler1-onNext:添加返回值
handler1-onCompleted:返回返回值
handler2-onNext:处理请求
handler2-onCompleted:处理完毕回调
处理的动作主要还是依靠handler1来进行把控。
是否返回,返回多少个值全部看如何调用handler1,是否标记为stream表现行为一致。
当请求参数为stream时候,handler2才会有用,但是返回行为只和handler1调用相关。
stream stream的含义就是:源源不断,个数不定。
当请求参数不标记为stream时,也即是仅有单个的请求时。
即是有多个的返回值,也就是返回值标记为stream,它会自动整合成iterator
但是当需要一次性处理多个请求但是只请求一次时,需要多次onNext时,就需要去定义observer
不论客户端还是服务端,这个时候都需要observer,也就该handler2出现了。
这个时候,都是异步进行处理,客户端也必须是异步的,都是架好流程自动回调处理。
parameter return mode handler2
- - single-single none
- stream single-stream none
stream - stream-single need
stream stream stream-stream need
【netty-grpc四种方法】代码在此

    推荐阅读