一:为什么要用grpc
With gRPC we can define our service once in a .proto file and implement clients and servers in any of gRPC’s supported languages, which in turn can be run in environments ranging from servers inside Google to your own tablet - all the complexity of communication between different languages and environments is handled for you by gRPC. We also get all the advantages of working with protocol buffers, including efficient serialization, a simple IDL, and easy interface updating.二:建立grpc的步骤
通俗讲就是可以通过.proto file来解耦客户端和服务端代码实现,支持跨平台跨语言,其自身拥有支持protobuf协议,高性能序列化,简单的接口描述以及便捷的接口升级等优点
- 定义.proto文件(protobuf语法使用)
- 通过protobuf的编译器插件自动生成客户端和服务端代码
- 通过java grpc api为服务编写相应的客户端和服务端代码
syntax = "proto3";
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuide";
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}service RouteGuide {// Obtains the feature at a given position.
// 简单grpc调用:客户端通过stub发起请求 等待服务端返回结果 就像本地调用一样
rpc GetFeature(Point) returns (Feature) {}// Obtains the Features available within the given Rectangle.Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
//服务端流式调用:客户端发起一次请求,服务端不是返回一个结果,而是将一组结果通过流返回
rpc ListFeatures(Rectangle) returns (stream Feature) {}// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
//客户端流式调用:客户端发起一组请求,服务端等到客户端所有请求发送完毕,接收到客户端的onCompleted()调用,此时服务端发送一次结果给客户端
rpc RecordRoute(stream Point) returns (RouteSummary) {}// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
//客户端服务端双向流:客户端和服务端双向流互不干预,可各种按照自己的顺序消费处理,比如服务端可以选择每次接受客户端一个请求就返回一个结果,也可以选择等客户端所有请求发送完毕收到客户端的onCompleted()调用再把所有的返回结果一次性返回给客户端
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
四:简单grpc调用
- 服务端代码:
@Override
public void getFeature(Point request, StreamObserver responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}...private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
- 客户端代码:
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature;
try {
feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
- 请求过程
- 客户端构造request,并通过stub调用getFeature方法,传入request对象
- 服务端接收到request对象,完成处理,通过responseObserver.onNext()返回给客户端,通过调用responseObserver.onCompleted()结束此次rpc调用
- 服务端代码:
private final Collection features;
...@Override
public void listFeatures(Rectangle request, StreamObserver responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
- 客户端代码:
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException ex) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
- 请求过程
- 客户端构造request对象,并且通过stub调用listFeatures方法,传入request对象
- 服务端接收到request对象,对每一个feature对象通过responseObserver.onNext()返回给客户端,一次请求会返回多次feature对象,最后通过responseObserver.onCompleted()结束rpc的处理,此时客户端接收到Iterator features
【grpc四种连接模式】客户端流式grpc调用稍微复杂一点,服务端会为客户端生成一个StreamObserver()供客户端流式调用,而客户端会生成一个StreamObserver responseObserver供服务端一次返回结果调用
- 服务端代码
@Override
public StreamObserver recordRoute(final StreamObserver responseObserver) {
return new StreamObserver() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
- 客户端代码
public void recordRoute(List features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver responseObserver = new StreamObserver() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0;
i < numPoints;
++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
- 调用过程
- 客户端构造responseObserver对象,并通过stub调用recordRoute方法传入responseObserver对象
- 服务端接收到客户端的调用,生成一个new StreamObserver()返回给客户端,客户端通过返回的StreamObserver()对象发起流式请求
- 客户端通过CountDownLatch阻塞当前线程
- 当客户端完成流式调用时,执行requestObserver.onCompleted(),此时服务端收到onCompleted()调用,执行 responseObserver.onNext()和responseObserver.onCompleted();
- 当服务端执行responseObserver.onCompleted()时,客户端收到onCompleted()调用,执行info(“Finished RecordRoute”)和finishLatch.countDown();
- 当客户端finishLatch.countDown()时,当前线程被释放,该次grpc调用结束
- 服务端代码
@Override
public StreamObserver routeChat(final StreamObserver responseObserver) {
return new StreamObserver() {
@Override
public void onNext(RouteNote note) {
List notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}// Now add the new note to the list
notes.add(note);
}@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
- 客户端代码
public void routeChat() throws Exception {
info("*** RoutChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver requestObserver =
asyncStub.routeChat(new StreamObserver() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
finishLatch.countDown();
}@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
- 调用过程
- 客户端生成一个new StreamObserver()作为responseObserver对象传给服务端
- 服务端return new StreamObserver()作为requestObserver对象传给客户端
- 通过CountDownLatch阻塞当前线程
- 客户端执行requestObserver.onNext(request),服务端收到onNext()调用,并执行responseObserver.onNext(prevNote);
- 客户端收到步骤4中服务端responseObserver.onNext(prevNote)调用,执行info(“Got message “{0}” at {1}, {2}”, note.getMessage(), note.getLocation().getLatitude(), note.getLocation().getLongitude());
- 当客户端请求完毕,执行requestObserver.onCompleted()时,服务端收到onCompleted()调用,执行responseObserver.onCompleted()调用
- 接着步骤6,客户端收到responseObserver.onCompleted()调用,执行finishLatch.countDown(),当前线程被释放,该次grpc调用结束
- grpc-tutorials-java
- grpc官网
- protobuf-github地址