single-single
protobuf
rpc searchStudent(Request) returns(Student){}
handler
public void searchStudent(Message.Request request, StreamObserver responseObserver) {
String param = request.getParam();
Message.Student student = Message.Student.newBuilder().setAddr("localhost").setId(99).setName(param).build();
responseObserver.onNext(student);
responseObserver.onCompleted();
}
通过
streamObserver
进行返回。onNext
:添加返回值onCimpleted
:返回Client
创建stub
ManagedChannel managedChannel = ManagedChannelBuilder .forAddress("localhost",8989) .usePlaintext() .build(); StudentServiceGrpc.StudentServiceBlockingStub stub = StudentServiceGrpc.newBlockingStub(managedChannel);
方法调用总需要这个,先提出来。
Message.Student student = stub.searchStudent(
Message.Request.newBuilder()
.setParam("godme").build()
);
System.out.println(student.toString());
这调用最方便了,直接传参并获取返回即可。
single-stream
protobuf
rpc addStudent(Request) returns(stream Student){}
入参一个,返回类型为
stream Student
handler
public void addStudent(Message.Request request, StreamObserver responseObserver) {
Integer count = Integer.valueOf(request.getParam());
while(count-- > 0){
responseObserver.onNext(Message.Student.newBuilder()
.setAddr("localhost")
.setName("godme")
.setId(count)
.build()
);
}
responseObserver.onCompleted();
}
还是先按照原来的理解吧
onNext
:添加返回值onCompleted
:返回嗯,啥都没变,不过可以多
return
几个值。Client
Iterator it = stub.addStudent(
Message.Request.newBuilder()
.setParam("9")
.build()
);
while(it.hasNext()){
System.out.println(it.next().toString()+ "\n*****************************");
}
就和
handler
中理解的一样,可以有多个返回值。在客户端一处,这些返回值会组合成一个
iterator
进行返回。我们去便利并处理即可。
stream
简单理解onNext
就好比write
onCompleted
就好比flush
不过这类比的是channel
中的流
这里的stream
是概念的流,主要感觉是源源不断
。
同时自动给我们解包,拆分成单个对象。
stream-single
protobuf
rpc countStudent(stream Request) returns( Request){}
handler
public StreamObserver countStudent(StreamObserver responseObserver) {
return new StreamObserver() {
private int count = 0;
@Override
public void onNext(Message.Request value) {
System.out.println("receive request : " + value.getParam());
count += 1;
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onNext(
Message.Request.newBuilder()
.setParam(String.valueOf(count))
.build()
);
responseObserver.onCompleted();
}
};
}
画风突变,最明显的就是有返回值了,而且代码更多了。简单梳理下:
StreamObserver
streamObserver
。同时,我们的返回值也必须是一个
observer
,只能手动创建- 结构
observer
中还是三个方法:onNext
:原来我们调用onNext
是作为添加返回值,这里作为的是接受处理传过来的多个请求我们最原始的这里我们只做记录请求个数的操作。stream
理解就是多个数据,这个就相当于是迭代iterator
请求参数进行处理。
因为我们请求参数是stream
,也就是多个,需要单个进行处理。
onError
:错误处理回调方法onCompleted
:标识方法结束,此时把处理结果返回出去responseObserver.onNext
。- 归纳
- 两个处理器
此时,我们的onNext
就是添加返回,onCompleted
就是返回数据。
我们返回的也当做一个处理器,onNext
就是处理每一个数据,onCompleted
就是全部处理完毕。
- 异步调用
后续会自己启动第二个处理器,处理完毕之后再操作,接上第一个处理器的onNext
添加返回值。
我所理解的就是这样
首先,这是一个异步的处理,我们需要做两个事情
也即是说,我们真的返回还是靠的入参的
- 完善处理方法,也就是第一个
handler
- 定制具体处理办法,第二个
handler
observer
,return
的observer
当做第二个handler
。
Client
StudentServiceGrpc.StudentServiceStub stub2 = StudentServiceGrpc.newStub(managedChannel);
StreamObserver handlerStreamObserver= new StreamObserver(){
@Override
public void onNext(Message.Request value) {
System.out.println("result : " + value.getParam());
}
@Override
public void onCompleted() {
System.out.println("stream - single run over");
}
@Override
public void onError(Throwable t) {}
};
StreamObserverrequestStreamObserver= stub2.countStudent(handlerStreamObserver);
int count = 8;
while(count-->0){
requestStreamObserver.onNext(Message.Request.newBuilder().setParam(String.valueOf(count)).build());
}
requestStreamObserver.onCompleted();
Thread.sleep(3000);
异步stub
StudentServiceGrpc.StudentServiceBlockingStub stub = StudentServiceGrpc.newBlockingStub(managedChannel); StudentServiceGrpc.StudentServiceStub stub2 = StudentServiceGrpc.newStub(managedChannel);
对比一下,第一个是阻塞的,第二个是异步的。
同步的stub
找不到对照的方法,只能异步的。
handler
StreamObserver
handlerStreamObserver= new StreamObserver (){ @Override public void onNext(Message.Request value) { System.out.println("result : " + value.getParam()); } @Override public void onCompleted() { System.out.println("stream - single run over"); } @Override public void onError(Throwable t) { } };
和服务端形成对比,我们要调用,也需要定义一个回调observer
。
调用StreamObserver
requestStreamObserver= stub2.countStudent(handlerStreamObserver); int count = 8; while(count-->0){ requestStreamObserver.onNext(Message.Request.newBuilder().setParam(String.valueOf(count)).build()); } requestStreamObserver.onCompleted();
通过传入回调的observer
,返回observer
。
然后onNext
就可以一次性发起多个请求,onCompleted
就标识结束了。
结束就结束了,没其他的了,具体处理由handler
回调处理。
把手动发起的请求也算作handler
clientHandler2 clientHandler1 serverHandler1 serverHandler2 装载 请求 响应 异步处理 结果返回 异步接收 回调处理 clientHandler2 clientHandler1 serverHandler1 serverHandler2 我们一般操控的就是clientHandler1
和serverHandler1
,直来直去的一次性处理。
即使是single-stream
,也是一次请求,一次性解包处理。
即使不知道服务端返回多少个数据,但是一次性的iterator
也能够一次性传输。
但是请求的话,第一个不知道有多少个,第二个问题就是不知道会延时多久。
一次性处理不仅不行,而且太慢,需要来一个处理一个,才有了异步。
这下,对于stream
有了更深的感触:源源不断,不知几何,来一个处理一个,需异步。
归根结底,我们就是接收请求和处理请求异步分开了,具体的处理自动触发,不归我们管理。
所以netty
中的bossGroup
和workerGroup
的角色这下也更深入了一下。
一个小坑Thread.sleep(3000);
因为是异步的,所以需要等待一下。
要不请求有了响应,具体处理结果还没回来呢。
程序已经结束,还有什么回调呢。
stream-stream
protobuf
rpc batchSearchStudent(stream Request) returns (stream Student){}
handler
public StreamObserver batchSearchStudent(StreamObserver responseObserver) {
return new StreamObserver() {
@Override
public void onNext(Message.Request value) {
responseObserver.onNext(
Message.Student.newBuilder()
.setId(Integer.valueOf(value.getParam()))
.setName("godme")
.setAddr("localhost")
.build()
);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
综合理解一下
handler1-onNext
:添加返回值handler1-completed
:表示处理结束,返回返回值handler2-onNext
:处理单个请求handler2-onCompleted
:表示处理结束h2_onNext h1_onNext h2_onCompleted h1_onCompleted client 计算结果,并添加 执行完毕,并通知 通知返回数据 返回计算结果 h2_onNext h1_onNext h2_onCompleted h1_onCompleted client 抛开层级划分:
onNext
:单个数据处理办法onCompleted
:数据全部处理完毕只不过,
handler2
不由我们进行调用,它onNext
完毕之后自动onComplted
。但是我们可以自定义区掌控
handler1
的动作啊。stream-single
的时候,多个参数,最后只返回一个,所以我们只能一次handler1-onNext
。当
stream-stream
的时候,我们就需要多次的handler1-onNext
了。client
StreamObserver streamStreamObserver = new StreamObserver() {
@Override
public void onNext(Message.Student student) {
System.out.println(student.toString() + "===stream===");
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("stream over");
}
};
StreamObserver streamObserver = stub2.batchSearchStudent(streamStreamObserver);
while(count++ < 4){
streamObserver.onNext(Message.Request.newBuilder().setParam(String.valueOf(count)).build());
}
Thread.sleep(5000);
}
和
single-stream
处理办法一致。小结
handler
handler1-onNext
:添加返回值handler1-onCompleted
:返回返回值handler2-onNext
:处理请求handler2-onCompleted
:处理完毕回调处理的动作主要还是依靠
handler1
来进行把控。是否返回,返回多少个值全部看如何调用
handler1
,是否标记为stream
表现行为一致。当请求参数为
stream
时候,handler2
才会有用,但是返回行为只和handler1
调用相关。stream
stream
的含义就是:源源不断,个数不定。当请求参数不标记为
stream
时,也即是仅有单个的请求时。即是有多个的返回值,也就是返回值标记为
stream
,它会自动整合成iterator
。但是当需要一次性处理多个请求但是只请求一次时,需要多次
onNext
时,就需要去定义observer
。不论客户端还是服务端,这个时候都需要
observer
,也就该handler2
出现了。这个时候,都是异步进行处理,客户端也必须是异步的,都是架好流程自动回调处理。
parameter | return | mode | handler2 |
---|---|---|---|
- | - | single-single | none |
- | stream | single-stream | none |
stream | - | stream-single | need |
stream | stream | stream-stream | need |
【netty-grpc四种方法】代码在此
推荐阅读
- Netty权威指南--图片
- Netty简介
- NIO|Netty应用篇
- Netty|Netty实现聊天室
- 《Netty学习打卡--从小白到放弃》----- 14 - netty 之grpc 初识grpc
- 《Netty学习打卡--从小白到放弃》----- 15 - netty 之grpc Simple RPC 例子
- Netty(一)--java NIO详解
- Netty|Netty4详解一(理解Netty的设计理念NIO)
- 通过Netty实现与硬件设备(充电桩)通讯的功能
- Netty-NIO 详解、说明、优缺