grpc四种连接模式

一:为什么要用grpc

With gRPC we can define our service once in a .proto file and implement clients and servers in any of gRPC’s supported languages, which in turn can be run in environments ranging from servers inside Google to your own tablet - all the complexity of communication between different languages and environments is handled for you by gRPC. We also get all the advantages of working with protocol buffers, including efficient serialization, a simple IDL, and easy interface updating.
通俗讲就是可以通过.proto file来解耦客户端和服务端代码实现,支持跨平台跨语言,其自身拥有支持protobuf协议,高性能序列化,简单的接口描述以及便捷的接口升级等优点
二:建立grpc的步骤
  1. 定义.proto文件(protobuf语法使用)
  2. 通过protobuf的编译器插件自动生成客户端和服务端代码
  3. 通过java grpc api为服务编写相应的客户端和服务端代码
三:定义.proto文件
syntax = "proto3"; option java_package = "io.grpc.examples.routeguide"; option java_outer_classname = "RouteGuide"; // Points are represented as latitude-longitude pairs in the E7 representation // (degrees multiplied by 10**7 and rounded to the nearest integer). // Latitudes should be in the range +/- 90 degrees and longitude should be in // the range +/- 180 degrees (inclusive). message Point { int32 latitude = 1; int32 longitude = 2; }service RouteGuide {// Obtains the feature at a given position. // 简单grpc调用:客户端通过stub发起请求 等待服务端返回结果 就像本地调用一样 rpc GetFeature(Point) returns (Feature) {}// Obtains the Features available within the given Rectangle.Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. //服务端流式调用:客户端发起一次请求,服务端不是返回一个结果,而是将一组结果通过流返回 rpc ListFeatures(Rectangle) returns (stream Feature) {}// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. //客户端流式调用:客户端发起一组请求,服务端等到客户端所有请求发送完毕,接收到客户端的onCompleted()调用,此时服务端发送一次结果给客户端 rpc RecordRoute(stream Point) returns (RouteSummary) {}// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). //客户端服务端双向流:客户端和服务端双向流互不干预,可各种按照自己的顺序消费处理,比如服务端可以选择每次接受客户端一个请求就返回一个结果,也可以选择等客户端所有请求发送完毕收到客户端的onCompleted()调用再把所有的返回结果一次性返回给客户端 rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} }

四:简单grpc调用
  • 服务端代码:
@Override public void getFeature(Point request, StreamObserver responseObserver) { responseObserver.onNext(checkFeature(request)); responseObserver.onCompleted(); }...private Feature checkFeature(Point location) { for (Feature feature : features) { if (feature.getLocation().getLatitude() == location.getLatitude() && feature.getLocation().getLongitude() == location.getLongitude()) { return feature; } }// No feature was found, return an unnamed feature. return Feature.newBuilder().setName("").setLocation(location).build(); }

  • 客户端代码:
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build(); Feature feature; try { feature = blockingStub.getFeature(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; }

  • 请求过程
  1. 客户端构造request,并通过stub调用getFeature方法,传入request对象
  2. 服务端接收到request对象,完成处理,通过responseObserver.onNext()返回给客户端,通过调用responseObserver.onCompleted()结束此次rpc调用
五:服务端流式grpc调用
  • 服务端代码:
private final Collection features; ...@Override public void listFeatures(Rectangle request, StreamObserver responseObserver) { int left = min(request.getLo().getLongitude(), request.getHi().getLongitude()); int right = max(request.getLo().getLongitude(), request.getHi().getLongitude()); int top = max(request.getLo().getLatitude(), request.getHi().getLatitude()); int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude()); for (Feature feature : features) { if (!RouteGuideUtil.exists(feature)) { continue; }int lat = feature.getLocation().getLatitude(); int lon = feature.getLocation().getLongitude(); if (lon >= left && lon <= right && lat >= bottom && lat <= top) { responseObserver.onNext(feature); } } responseObserver.onCompleted(); }

  • 客户端代码:
Rectangle request = Rectangle.newBuilder() .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build()) .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build(); Iterator features; try { features = blockingStub.listFeatures(request); } catch (StatusRuntimeException ex) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; }

  • 请求过程
  1. 客户端构造request对象,并且通过stub调用listFeatures方法,传入request对象
  2. 服务端接收到request对象,对每一个feature对象通过responseObserver.onNext()返回给客户端,一次请求会返回多次feature对象,最后通过responseObserver.onCompleted()结束rpc的处理,此时客户端接收到Iterator features
六:客户端流式grpc调用
【grpc四种连接模式】客户端流式grpc调用稍微复杂一点,服务端会为客户端生成一个StreamObserver()供客户端流式调用,而客户端会生成一个StreamObserver responseObserver供服务端一次返回结果调用
  • 服务端代码
@Override public StreamObserver recordRoute(final StreamObserver responseObserver) { return new StreamObserver() { int pointCount; int featureCount; int distance; Point previous; long startTime = System.nanoTime(); @Override public void onNext(Point point) { pointCount++; if (RouteGuideUtil.exists(checkFeature(point))) { featureCount++; } // For each point after the first, add the incremental distance from the previous point // to the total distance value. if (previous != null) { distance += calcDistance(previous, point); } previous = point; }@Override public void onError(Throwable t) { logger.log(Level.WARNING, "Encountered error in recordRoute", t); }@Override public void onCompleted() { long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime); responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount) .setFeatureCount(featureCount).setDistance(distance) .setElapsedTime((int) seconds).build()); responseObserver.onCompleted(); } }; }

  • 客户端代码
public void recordRoute(List features, int numPoints) throws InterruptedException { info("*** RecordRoute"); final CountDownLatch finishLatch = new CountDownLatch(1); StreamObserver responseObserver = new StreamObserver() { @Override public void onNext(RouteSummary summary) { info("Finished trip with {0} points. Passed {1} features. " + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(), summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime()); }@Override public void onError(Throwable t) { Status status = Status.fromThrowable(t); logger.log(Level.WARNING, "RecordRoute Failed: {0}", status); finishLatch.countDown(); }@Override public void onCompleted() { info("Finished RecordRoute"); finishLatch.countDown(); } }; StreamObserver requestObserver = asyncStub.recordRoute(responseObserver); try { // Send numPoints points randomly selected from the features list. Random rand = new Random(); for (int i = 0; i < numPoints; ++i) { int index = rand.nextInt(features.size()); Point point = features.get(index).getLocation(); info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point), RouteGuideUtil.getLongitude(point)); requestObserver.onNext(point); // Sleep for a bit before sending the next one. Thread.sleep(rand.nextInt(1000) + 500); if (finishLatch.getCount() == 0) { // RPC completed or errored before we finished sending. // Sending further requests won't error, but they will just be thrown away. return; } } } catch (RuntimeException e) { // Cancel RPC requestObserver.onError(e); throw e; } // Mark the end of requests requestObserver.onCompleted(); // Receiving happens asynchronously finishLatch.await(1, TimeUnit.MINUTES); }

  • 调用过程
  1. 客户端构造responseObserver对象,并通过stub调用recordRoute方法传入responseObserver对象
  2. 服务端接收到客户端的调用,生成一个new StreamObserver()返回给客户端,客户端通过返回的StreamObserver()对象发起流式请求
  3. 客户端通过CountDownLatch阻塞当前线程
  4. 当客户端完成流式调用时,执行requestObserver.onCompleted(),此时服务端收到onCompleted()调用,执行 responseObserver.onNext()和responseObserver.onCompleted();
  5. 当服务端执行responseObserver.onCompleted()时,客户端收到onCompleted()调用,执行info(“Finished RecordRoute”)和finishLatch.countDown();
  6. 当客户端finishLatch.countDown()时,当前线程被释放,该次grpc调用结束
七:客户端服务端双向流式grpc调用
  • 服务端代码
@Override public StreamObserver routeChat(final StreamObserver responseObserver) { return new StreamObserver() { @Override public void onNext(RouteNote note) { List notes = getOrCreateNotes(note.getLocation()); // Respond with all previous notes at this location. for (RouteNote prevNote : notes.toArray(new RouteNote[0])) { responseObserver.onNext(prevNote); }// Now add the new note to the list notes.add(note); }@Override public void onError(Throwable t) { logger.log(Level.WARNING, "Encountered error in routeChat", t); }@Override public void onCompleted() { responseObserver.onCompleted(); } }; }

  • 客户端代码
public void routeChat() throws Exception { info("*** RoutChat"); final CountDownLatch finishLatch = new CountDownLatch(1); StreamObserver requestObserver = asyncStub.routeChat(new StreamObserver() { @Override public void onNext(RouteNote note) { info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation() .getLatitude(), note.getLocation().getLongitude()); }@Override public void onError(Throwable t) { Status status = Status.fromThrowable(t); logger.log(Level.WARNING, "RouteChat Failed: {0}", status); finishLatch.countDown(); }@Override public void onCompleted() { info("Finished RouteChat"); finishLatch.countDown(); } }); try { RouteNote[] requests = {newNote("First message", 0, 0), newNote("Second message", 0, 1), newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)}; for (RouteNote request : requests) { info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation() .getLatitude(), request.getLocation().getLongitude()); requestObserver.onNext(request); } } catch (RuntimeException e) { // Cancel RPC requestObserver.onError(e); throw e; } // Mark the end of requests requestObserver.onCompleted(); // Receiving happens asynchronously finishLatch.await(1, TimeUnit.MINUTES); }

  • 调用过程
  1. 客户端生成一个new StreamObserver()作为responseObserver对象传给服务端
  2. 服务端return new StreamObserver()作为requestObserver对象传给客户端
  3. 通过CountDownLatch阻塞当前线程
  4. 客户端执行requestObserver.onNext(request),服务端收到onNext()调用,并执行responseObserver.onNext(prevNote);
  5. 客户端收到步骤4中服务端responseObserver.onNext(prevNote)调用,执行info(“Got message “{0}” at {1}, {2}”, note.getMessage(), note.getLocation().getLatitude(), note.getLocation().getLongitude());
  6. 当客户端请求完毕,执行requestObserver.onCompleted()时,服务端收到onCompleted()调用,执行responseObserver.onCompleted()调用
  7. 接着步骤6,客户端收到responseObserver.onCompleted()调用,执行finishLatch.countDown(),当前线程被释放,该次grpc调用结束
  • grpc-tutorials-java
  • grpc官网
  • protobuf-github地址

    推荐阅读