分布式学习(1)gRPC python@protobuf_grpc

一、安装
  1. gRPC 的安装:
$ pip install grpcio
  1. ProtoBuf 相关的 python 依赖库:
$ pip install protobuf
  1. python grpc 的 protobuf 编译工具:
$ pip install grpcio-tools
二、protobuf 文档
【分布式学习(1)gRPC python@protobuf_grpc】信息结构体 message
message SearchRequest {
required string query = 1; // 查询字符串
optional int32 page_number = 2; // 第几页
optional int32 result_per_page = 3; // 每页的结果数
}
# S——1:指定字段规则
[1] required: 格式良好的 message 必须包含该字段一次;
[2] optional: 格式良好的 message 可以包含该字段零次或一次(不超过一次);optional格式的message 可以使用defalut方法:
optional int32 result_per_page = 3 [default = 10];
[3] repeated: 该字段可以在格式良好的消息中重复任意多次(包括零),其中重复值的顺序会被保留;由于一些历史原因,标量数字类型的 repeated 字段不能尽可能高效地编码。新代码应使用特殊选项 [packed = true] 来获得更高效的编码。例如 repeated int32 samples = 4 [packed=true];
# S——2:添加注释
[1] 为你的 .proto 文件添加注释,可以使用 C/C++ 语法风格的注释 // 和 /* ... */;
# S——3:简单数据类型
?分布式学习(1)gRPC python@protobuf_grpc
文章图片

# S——3:复杂数据类型
[1] 枚举 Enumerations:
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
[2] 嵌套类型
[3] Any类型:Any类型消息允许在没有指定.proto定义的情况下使用消息作为一个嵌套类型。一个Any类型包括一个可以被序列化bytes类型的任意消息以及一个URL作为一个全局标识符和解析消息类型。
为了使用Any类型,需要导入import google/protobuf/any.proto
import "google/protobuf/any.proto"; message ErrorStatus {
string message = 1;
repeated google.protobuf.Any details = 2;
}
[4] Oneof类型:如果你的 message 包含许多可选字段,并且最多只能同时设置其中一个字段,则可以使用 oneof 功能强制执行此行为并节省内存。Oneof 字段类似于可选字段,除了 oneof 共享内存中的所有字段,并且最多只能同时设置一个字段。设置 oneof 的任何成员会自动清除所有其他成员。你可以使用特殊的 case() 或 WhichOneof() 方法检查 oneof 字段中当前是哪个值(如果有)被设置,具体方法取决于你选择的语言。
message SampleMessage {
oneof test_oneof {
string name = 4;
SubMessage sub_message = 9;
}
}
[5] Map 类型:其中key_type可以是任意Integer或者string类型(除了floating和bytes的任意标量类型都可以),value_type可以是任意类型,但不能是map类型。
map map_field = N;

# S——4:允许添加多个 message
[1] 可以在单个 .proto 文件中定义多种 message 类型。比如 request_message 和 response_message
定义服务
如果想要将消息类型用在RPC(远程方法调用)系统中,可以在.proto文件中定义一个RPC服务接口,Protobuf编译器将会根据所选择的不同语言生成服务接口代码及stub。如要定义一个RPC服务并具有一个方法Search,Search方法能够接收SearchRequest并返回一个SearchResponse,可以在.proto文件中进行如下定义:
service SearchService {
rpc Search (SearchRequest) returns (SearchResponse);
}
三、尝试案例(传输字母,大写字母返回)
# data.proto syntax = "proto3"; package example; service FormatData { rpc DoFormat(Data) returns (Data){} } message Data { string text = 1; }$cmd>> python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./data.proto

# client.py#! /usr/bin/env python # -*- coding: utf-8 -*- import grpc import data_pb2, data_pb2_grpc_HOST = 'localhost' _PORT = '8080'def run(): conn = grpc.insecure_channel(_HOST + ':' + _PORT) client = data_pb2_grpc.FormatDataStub(channel=conn) response = client.DoFormat(data_pb2.Data(text='hello,world!')) print("received: " + response.text)if __name__ == '__main__': run()

# server.py#! /usr/bin/env python # -*- coding: utf-8 -*- import grpc import time from concurrent import futures import data_pb2, data_pb2_grpc_ONE_DAY_IN_SECONDS = 60 * 60 * 24 _HOST = 'localhost' _PORT = '8080'class FormatData(data_pb2_grpc.FormatDataServicer): def DoFormat(self, request, context): str = request.text return data_pb2.Data(text=str.upper())def serve(): grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=4)) data_pb2_grpc.add_FormatDataServicer_to_server(FormatData(), grpcServer) grpcServer.add_insecure_port(_HOST + ':' + _PORT) grpcServer.start() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: grpcServer.stop(0)if __name__ == '__main__': serve()

引用本文--->click
一、本地调用与远程调用
本地过程调用
RPC就是要像调用本地的函数一样去调远程函数。在研究RPC前,我们先看看本地调用是怎么调的。假设我们要调用函数Multiply来计算lvalue * rvalue的结果:
1 int Multiply(int l, int r) { 2int y = l * r; 3return y; 4 } 5 6 int lvalue = https://www.it610.com/article/10; 7 int rvalue = 20; 8 int l_times_r = Multiply(lvalue, rvalue);

那么在第8行时,我们实际上执行了以下操作:
  1. 将 lvalue 和 rvalue 的值压栈
  2. 进入Multiply函数,取出栈中的值10 和 20,将其赋予 l 和 r
  3. 执行第2行代码,计算 l * r ,并将结果存在 y
  4. 将 y 的值压栈,然后从Multiply返回
  5. 第8行,从栈中取出返回值 200 ,并赋值给 l_times_r
以上5步就是执行本地调用的过程。(20190116注:以上步骤只是为了说明原理。事实上编译器经常会做优化,对于参数和返回值少的情况会直接将其存放在寄存器,而不需要压栈弹栈的过程,甚至都不需要调用call,而直接做inline操作。仅就原理来说,这5步是没有问题的。)
远程过程调用带来的新问题
在远程调用时,我们需要执行的函数体是在远程的机器上的,也就是说,Multiply是在另一个进程中执行的。这就带来了几个新问题:
  1. Call ID映射。我们怎么告诉远程机器我们要调用Multiply,而不是Add或者FooBar呢?在本地调用中,函数体是直接通过函数指针来指定的,我们调用Multiply,编译器就自动帮我们调用它相应的函数指针。但是在远程调用中,函数指针是不行的,因为两个进程的地址空间是完全不一样的。所以,在RPC中,所有的函数都必须有自己的一个ID。这个ID在所有进程中都是唯一确定的。客户端在做远程过程调用时,必须附上这个ID。然后我们还需要在客户端和服务端分别维护一个 {函数 <--> Call ID} 的对应表。两者的表不一定需要完全相同,但相同的函数对应的Call ID必须相同。当客户端需要进行远程调用时,它就查一下这个表,找出相应的Call ID,然后把它传给服务端,服务端也通过查表,来确定客户端需要调用的函数,然后执行相应函数的代码。
  2. 序列化和反序列化。客户端怎么把参数值传给远程的函数呢?在本地调用中,我们只需要把参数压到栈里,然后让函数自己去栈里读就行。但是在远程过程调用时,客户端跟服务端是不同的进程,不能通过内存来传递参数。甚至有时候客户端和服务端使用的都不是同一种语言(比如服务端用C++,客户端用Java或者Python)。这时候就需要客户端把参数先转成一个字节流,传给服务端后,再把字节流转成自己能读取的格式。这个过程叫序列化和反序列化。同理,从服务端返回的值也需要序列化反序列化的过程。
  3. 网络传输。远程调用往往用在网络上,客户端和服务端是通过网络连接的。所有的数据都需要通过网络传输,因此就需要有一个网络传输层。网络传输层需要把Call ID和序列化后的参数字节流传给服务端,然后再把序列化后的调用结果传回客户端。只要能完成这两者的,都可以作为传输层使用。因此,它所使用的协议其实是不限的,能完成传输就行。尽管大部分RPC框架都使用TCP协议,但其实UDP也可以,而gRPC干脆就用了HTTP2。Java的Netty也属于这层的东西。
有了这三个机制,就能实现RPC了,具体过程如下:
// Client端 //int l_times_r = Call(ServerAddr, Multiply, lvalue, rvalue) 1. 将这个调用映射为Call ID。这里假设用最简单的字符串当Call ID的方法 2. 将Call ID,lvalue和rvalue序列化。可以直接将它们的值以二进制形式打包 3. 把2中得到的数据包发送给ServerAddr,这需要使用网络传输层 4. 等待服务器返回结果 5. 如果服务器调用成功,那么就将结果反序列化,并赋给l_times_r// Server端 1. 在本地维护一个Call ID到函数指针的映射call_id_map,可以用std::map> 2. 等待请求 3. 得到一个请求后,将其数据包反序列化,得到Call ID 4. 通过在call_id_map中查找,得到相应的函数指针 5. 将lvalue和rvalue反序列化后,在本地调用Multiply函数,得到结果 6. 将结果序列化后通过网络返回给Client

所以要实现一个RPC框架,其实只需要按以上流程实现就基本完成了。
其中:
  • Call ID映射可以直接使用函数字符串,也可以使用整数ID。映射表一般就是一个哈希表。
  • 序列化反序列化可以自己写,也可以使用Protobuf或者FlatBuffers之类的。
  • 网络传输库可以自己写socket,或者用asio,ZeroMQ,Netty之类。
当然,这里面还有一些细节可以填充,比如如何处理网络错误,如何防止攻击,如何做流量控制,等等。但有了以上的架构,这些都可以持续加进去。

    推荐阅读