grpc错误处理

对于rpc调用异常处理是需要考虑的一个方面,这里介绍一下grpc中对异常的处理。注:这里的案例都是改造了之前的接口
简单 简单模式是一个request对应一个response。

//服务端 public void simpleHello(ProtoObj.Person request, StreamObserver responseObserver) { System.out.println(request.getMyName()+" calling"); //返回一个包装成Exception的Status来返回错误信息,如果直接使用Throwable,客户端无法获得错误信息 responseObserver.onError(Status.INTERNAL.withDescription("error desc").asRuntimeException()); //如果调用了onError会自动complete无需手动complete //responseObserver.onCompleted(); } //客户端 @Test public voidsimple() throws InterruptedException { final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build(); HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel); ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build(); //simple System.out.println("---simple rpc---"); try { //try catch远程调用 System.out.println(blockingStub.simpleHello(person).getString()); }catch(Exception e){ //将异常转换为status可以得到对应的异常信息 Status status = Status.fromThrowable(e); status.asException().printStackTrace(); } channel.shutdown(); } //输出 ---simple rpc--- io.grpc.StatusException: INTERNAL: error desc at io.grpc.Status.asException(Status.java:548) at blog.HelloClient.simple(HelloClient.java:100) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ...

客户端流式 客户端流式是多个request对应单个response。
//服务端 @Override public StreamObserver clientStreamHello( final StreamObserver responseObserver) { return new StreamObserver【grpc错误处理】(){ private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder(); @Override public void onNext(ProtoObj.Person value) { builder.setString(builder.getString() +"," + value.getMyName()); }@Override public void onError(Throwable t) { responseObserver.onError(new Exception("custom error")); }@Override public void onCompleted() { builder.setString("hello"+builder.getString()); //返回异常 responseObserver.onError(Status.INTERNAL.withDescription("error desc").asRuntimeException()); //responseObserver.onNext(builder.build()); //responseObserver.onCompleted(); } }; } //客户端 @Test public void client() throws InterruptedException {final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build(); HelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel); ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build(); //使用latch来判断是否结束 final CountDownLatch latch = new CountDownLatch(1); //client side System.out.println("---client stream rpc---"); StreamObserver responseObserver = new StreamObserver() { @Override public void onNext(ProtoObj.Result result) { System.out.println("client stream--" + result.getString()); }@Override public void onError(Throwable t) { latch.countDown(); //处理异常 Status status = Status.fromThrowable(t); status.asException().printStackTrace(); }@Override public void onCompleted() { latch.countDown(); } }; //发送请求 StreamObserver clientStreamObserver = asyncStub.clientStreamHello(responseObserver); clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build()); clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build()); clientStreamObserver.onCompleted(); //使用latch判断是否结束调用 if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) { throw new RuntimeException("timeout!"); } channel.shutdown(); } //输出 ---client stream rpc--- io.grpc.StatusException: INTERNAL: error desc at io.grpc.Status.asException(Status.java:548) at blog.HelloClient$3.onError(HelloClient.java:148) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:392)

服务端流式 服务端流式是单个request对应多个response。
//服务端 @Override public void serverStreamHello(ProtoObj.Person request, StreamObserver responseObserver) { System.out.println(request.getMyName()+" calling"); responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build()); responseObserver.onError(Status.INTERNAL.withDescription("error desc").asRuntimeException()); //error之后就不能继续调用next了 //responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+request.getMyName()).build()); //responseObserver.onCompleted(); } //客户端 @Test public void server(){ final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build(); HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel); ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build(); //server side System.out.println("---server stream rpc---"); Iterator it = blockingStub.serverStreamHello(person); try{ while (it.hasNext()) { System.out.print(it.next()); } }catch (Exception e){ Status status = Status.fromThrowable(e); status.asException().printStackTrace(); } channel.shutdown(); } //输出 ---server stream rpc--- string: "hello, World" io.grpc.StatusException: INTERNAL: error desc at io.grpc.Status.asException(Status.java:548) at blog.HelloClient.server(HelloClient.java:122) ...

双向流式 双向流式是多个request对应多个response,异常处理和客户端流式类似(使用异步的newStub),这里就不再赘述。
转载于:https://www.cnblogs.com/resentment/p/6883153.html

    推荐阅读