grpc练习

先写 .gropo文件,传输数据

python -m grpc_tools.protoc -I.--python_out=. --grpc_python_out=.reco.proto

  • -I 需要导入的proto文件从哪个目录中寻找 (-I. 表示从当前目录中查找)
    ---python_out proto文件中定义的message 字段生成的python代码文件保存到哪个目录
  • --grpc_python_out proto文件中定义的service 字段生成的python代码文件保存到哪个目录
在这个文件夹右击 updown把那两个文件下载下来
写server.py文件
  • 带_grpc的文件能找到类名
  • 方法名就是双方定义的方法名
  • 不带_grpc的文件 . (带_grpc的文件下面)xxxResponse来创建传输数据的对象
写client.py文件
【grpc练习】运行server文件
python server.py

运行client文件
python client.py

以下是实例代码
# reco.protosyntax = "proto3"; message UserRequest { string user_id=1; int32 channel_id=2; int32 article_num=3; int64 time_stamp=4; }message Track { string click=1; string collect=2; string share=3; string read=4; }message Article { int64 article_id=1; Track track=2; }message ArticleResponse { string exposure=1; int64 time_stamp=2; repeated Article recommends=3; }service UserRecommend { rpc user_recommend(UserRequest) returns(ArticleResponse) {} }

# reco_pb2_grpc.py # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! import grpcimport reco_pb2 as reco__pb2class UserRecommendStub(object): # missing associated documentation comment in .proto file passdef __init__(self, channel): """Constructor.Args: channel: A grpc.Channel. """ self.user_recommend = channel.unary_unary( '/UserRecommend/user_recommend', request_serializer=reco__pb2.UserRequest.SerializeToString, response_deserializer=reco__pb2.ArticleResponse.FromString, )class UserRecommendServicer(object): # missing associated documentation comment in .proto file passdef user_recommend(self, request, context): # missing associated documentation comment in .proto file pass context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!')def add_UserRecommendServicer_to_server(servicer, server): rpc_method_handlers = { 'user_recommend': grpc.unary_unary_rpc_method_handler( servicer.user_recommend, request_deserializer=reco__pb2.UserRequest.FromString, response_serializer=reco__pb2.ArticleResponse.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( 'UserRecommend', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,))

# client.pyimport grpc import reco_pb2_grpc import reco_pb2 import timedef run(): # 连接rpc服务器得到与rpc连接的对象 with grpc.insecure_channel('127.0.0.1:8888') as conn: # 创建进行rpc调用的工具对象 stub = reco_pb2_grpc.UserRecommendStub(conn)# 通过工具对象进行rpc函数调用 req = reco_pb2.UserRequest() req.user_id = '1' req.channel_id = 12 req.article_num = 10 req.time_stamp = round(time.time() * 1000)ret = stub.user_recommend(req) # ret -> ArticleResponse 对象 print(ret)if __name__ == '__main__': run()

# server.pyimport grpc from concurrent.futures import ThreadPoolExecutor import time import reco_pb2_grpc import reco_pb2# 补全被调用的代码 class UserRecommendServicer(reco_pb2_grpc.UserRecommendServicer): # 在接口定义的同名方法中补全,被调用时应该执行的逻辑 def user_recommend(self, request, context): # request是调用的请求数据对象 user_id = request.user_id channel_id = request.channel_id article_num = request.article_num time_stamp = request.time_stampresponse = reco_pb2.ArticleResponse() response.exposure = 'exposure param' response.time_stamp = round(time.time() * 1000) recommends = [] for i in range(article_num): article = reco_pb2.Article() article.track.click = 'click param {}'.format(i + 1) article.track.collect = 'collect param {}'.format(i + 1) article.track.share = 'share param {}'.format(i + 1) article.track.read = 'read param {}'.format(i + 1) article.article_id = i + 1 recommends.append(article) response.recommends.extend(recommends)# 最终要返回一个调用结果 return response# 编写rpc服务器运行代码 def serve(): # 创建rpc服务器对象 server = grpc.server(ThreadPoolExecutor(max_workers=20))# 为rpc服务器 注册能够对外提供的代码 reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendServicer(), server)# 绑定rpc IP地址和端口号 server.add_insecure_port('127.0.0.1:8888')# 启动服务器 server.start()# 非阻塞# 为了防止程序因为非阻塞退出,手动阻塞 while True: time.sleep(10)if __name__ == '__main__': serve()

    推荐阅读