grpc的简单用例|grpc的简单用例 (C++实现)

这个用例的逻辑很简单, 服务器运行一个管理个人信息的服务, 提供如下的四个服务:
(1) 添加一个个人信息
注: 对应于Unary RPCs, 客户端发送单一消息给服务器, 服务器返回单一消息
(2) 添加多个个人信息
注: 对应于Client streaming RPCs, 客户端使用提供的stream发送多个消息给服务端, 等客户端写完了所有的消息, 就会等待服务器读取这些消息, 然后返回响应消息. gRPC保证在一次RPC调用中, 消息是顺序的.
(3) 获取最多N个个人信息
注: 对应于Server streaming RPCs, 客户端发送一条消息给服务端, 然后获取一个stream来读取一系列的返回消息. 客户端会一直读取消息, 知道没有消息可读为止, gRPC保证在一次RPC调用中,消息是顺序的.
(4) 获取指定名字的所有个人信息
注: 对应于Bidirectional streaming RPCs, 这种rcp, 客户端和服务端通过一个read-write stream来发送一系列的消息. 这两个消息流可以独立操作, 就是说, 客户端和服务端可以以任意它们所想的顺序操作这两个消息流. 例如, 服务器可以等待接收到所有的客户端消息时,才开始向客户端发送消息, 或者它可以读一条消息, 然后给客户端发送一条消息, 或者别的想要的方式.在两个消息流的其中一个中, 消息是顺序的.

在给出代码之前, 先说明一件事, 在grpc中, 请求参数和返回值类型都需要是message类型, 而不能是string, int32等类型.下面给出proto文件的定义:

// [START declaration] syntax = "proto3"; package tutorial; import "google/protobuf/timestamp.proto"; // [END declaration]// [START messages] message Person { string name = 1; int32 id = 2; // Unique ID number for this person. string email = 3; enum PhoneType { MOBILE = 0; HOME = 1; WORK = 2; }message PhoneNumber { string number = 1; PhoneType type = 2; }repeated PhoneNumber phones = 4; google.protobuf.Timestamp last_updated = 5; }// Our address book file is just one of these. message AddressBook { repeated Person people = 1; }// rpc调用的结果 message Result { bool success = 1; }// rpc请求的个数 message ReqNum { int32 num = 1; }message ReqName { string name = 1; }// [END messages]// Interface exported by the server. service Manage { // 添加一个人 rpc AddPerson(Person) returns (Result) {} // 添加很多人 rpc AddPersons(stream Person) returns (Result) {} // 获取指定数目的个人列表 rpc GetPersonsLimit(ReqNum) returns (stream Person) {} // 获取名字为输入的个人列表 rpc GetPersons(stream ReqName) returns (stream Person) {} }

Person的定义和之前的protobuf中一致, 新加了一些用于grpc调用的结构体, 这些结构体很简单, 就不讲了. service Manage中定义的是这个服务提供的rpc调用接口.
(1) 添加一个个人信息 对应的是AddPerson
(2) 添加多个个人信息 对应的是 AddPersons
(3) 获取最多N个个人信息 对应的是 GetPersonsLimit
(4) 获取指定名字的所有个人信息 对应的是 GetPersons
rpc定义很直观, 应该可以参照写出需要的rpc, 按照我了解的, 每个rpc有一个输入参数和一个输出参数, 这个需要注意.
person.proto文件生成文件包括person.pb.h与 person.pb.cc和person.grpc.pb.h与person.grpc.pb.cc, 其中的person.pb.h和person.pb.cc文件是proto文件中的结构体等生成的文件, 所以主要关注person.grpc.pb.h和person.grpc.pb.cc文件.
我们查看一下person.grpc.pb.*文件中的内容, 这个文件中只有一个类, 就是class Manage, 这个类名和proto文件中的Service是同一个名字. 下面我们查看Manage类中的内容:
(1) 函数service_full_name用来返回这个服务的名字, 命名方式是: package + “.” + service_name(包名+”.”+服务名).
(2) class StubInterface内部类, 这个类是定义客户端操作的存根(stub)的接口类. 这个类中有如下函数:
1) AddPerson相关的函数, 对应于proto文件中的rpc AddPerson(Person) returns (Result) {}函数:
virtual Status AddPerson(ClientContext *context, const tutorial::Person& request, ::tutorial::Result* response) = 0; std::unique_ptr> AsyncAddPerson(ClientContext* context, const tutorial::Person& request, CompletionQueue *cq) { return unique_ptr>(AsyncAddPersonRaw(context, request, cq)); } unique_ptr> PrepareAsyncAddPerson(ClientContext *context, const tutorial::Person& request, Completion* cq) { return unique_ptr>(PrepareAsyncAddPersonRaw(context, request, cq)); }

2) AddPersons相关函数, 对应于proto文件中的rpc AddPersons(stream Person) returns (Result) {}函数:
unique_ptr> AddPersons(ClientConext* context, tutorial::Result *response) { return unique_ptr(AddPersonsRaw(context, response); } ...

3) GetPersonsLimit相关函数, 对应于proto文件中的rpc GetPersonsLimit(ReqNum) returns (stream Person) {}函数:
unique_ptr> GetPersonsLimit(ClientContext* context, const tutorial::ReqNum& request) { return unique_ptr>(GetPersonsLimitRaw(context, request)); } ...

4) GetPersons相关函数, 对应于proto文件中的rpc GetPersons(stream ReqName) returns (stream Person) {}函数:
unique_ptr> GetPersons(ClientContext *context) { return unique_ptr>(GetPersonsRaw(context)); } ...

5) class experimental_async_interface应该是实验性的异步调用类, 以及获取这个类对象的函数, experimental_async.
6) 实现用的虚函数: AsyncAddPersonRaw, PrepareAsyncAddPersonRaw,AddPersonsRaw, AsyncAddPersonsRaw, PrepareAsyncAddPersonsRaw, AsyncGetPersonsLimitRaw, PrepareAsyncGetPersonsLimitRaw, GetPersonsRaw, AsyncGetPersonsRaw, PrepareAsyncGetPersonsRaw.
(3) class Stub是Manage类的内部类. 这个类是定义客户端操作的存根(stub)的具体实现类. 实现了上面的StubInterface类的各种接口.
(4) 创建客户端存根的函数:
static std::unique_ptr NewStub(const shared_ptr& channel, const StubOptions& options = StubOptions()); unique_ptr Manage::NewStub(const shared_ptr& channel, const StubOptions& options) { (void)options; unique_ptr stub(new Manage::Stub(channel)); return stub; }

(1) class Service内部类, 这个是生成的grpc服务端接口, 服务端主要需要实现的就是这个接口类的接口. 这个类的函数包括:
1) 构造函数与析构函数: Service和~Service虚函数, 下面是构造函数实现:
Manage::Service::Service() { AddMethod(new internal::RpcServiceMethod( Manage_method_names[0], internal::RpcMethod::NORMAL_RPC, new internal::RcpMethodHandler ( std::mem_fn(&Manage::Service::AddPerson), this))); AddMethod(new internal::RpcServiceMethod( Manage_method_names[1], internal::RpcMethod::CLIENT_STREAMING, new internal::ClientStreamingHandler( std::mem_fn(&Manage::Service::AddPersons), this))); AddMethod(new internal::RpcServiceMethod( Manage_method_names[2], internal::RpcMethod::SERVER_STREAMING, new internal::ServerStreamingHandler( std::mem_fn(&Manage::Service::GetPersonsLimit), this))); AddMethod(new internal::RpcServiceMethod( Manage_method_names[3], internal::RpcMethod::BIDI_STREAMING, new internal::BidiStreamingHandler( std; :mem_fn(&Manage::Service::GetPersons), this))); }

2) 虚接口函数:
virtual grpc::Status AddPerson(grpc::ServerContext *context, const tutorial::Person* request, tutorial::Result* response); virtual grpc::Status AddPersons(grpc::ServerContext *context, grpc::ServerReader* reader, tutorial::Result* response); virtual grpc::Status GetPersonsLimit(grpc::ServerContext *context, const tutorial::ReqNum* request, grpc::ServerWriter *writer); virtual grpc::Status GetPersons(grpc::ServerContext* context, grpc::ServerReaderWriter* stream);

(6) 内部模板类WithAsyncMethod_AddPerson, WithAsyncMethod_AddPersons, WithAsyncMethod_GetPersonsLimit, WithAsyncMethod_GetPersons:
template class WithAsyncMethod_AddPerson : public BaseClass template class WithAsyncMethod_AddPerson : public BaseClass template class WithAsyncMethod_GetPersonsLimit : public BaseClass template class WithAsyncMethod_GetPersons : public BaseClass

(7) 异步的服务类:
typedef WithAsyncMethod_AddPerson>> AsyncService;

(8) 内部模板类 ExperimentalWithCallbackMethod_AddPerson, ExperimentalWithCallbackMethod_AddPersons, ExperimentalWithCallback_GetPersonsLimit, ExperimentalWithCallbackMethod_GetPersons:
template class ExperimentalWithCallbackMethod_AddPerson : public BaseClass template class ExperimentalWithCallbackMethod_AddPersons : public BaseClass template class ExperimentalWithCallbackMethod_GetPersonsLimit : public BaseClass template class ExperimentalWithCallbackMethod_GetPersons : public BaseClass

(9) 实验性的带回调函数的服务类:
typedef ExperimentalWithCallbackMethod_AddPerson>> ExperimentalCallbackService;

(10) 内部模板类, WithGenericMethod_AddPerson, WithGenericMethod_AddPersons, WithGenericMethod_GetPersonsLimit, WithGenericMethod_GetPersons:
template class WithGenericMethod_AddPerson : public BaseClass template class WithGenericMethod_AddPersons : public BaseClass template class WithGenericMethod_GetPersonsLimit : public BaseClass template class WithGenericMethod_GetPersons : public BaseClass

(11) 内部模板类, WithRawMethod_AddPerson, WithRawMethod_AddPersons, WithRawMethod_GetPersonsLimit, WithRawMethod_GetPersons:
template class WithRawMethod_AddPerson : public BaseClass template class WithRawMethod_AddPersons : public BaseClass template class WithRawMethod_GetPersonsLimit : public BaseClass template class WithRawMethod_GetPersons : public BaseClass

(12) 内部模板类, ExperimentalWithRawCallbackMethod_AddPerson, ExperimentalWithRawCallbackMethod_AddPersons, ExperimentalWithRawCallbackMethod_GetPersonsLimit, ExperimentalWithRawCallbackMethod_GetPersons:
template class ExperimentalWithRawCallbackMethod_AddPerson : public BaseClass template class ExperimentalWithRawCallbackMethod_AddPersons : public BaseClass template class ExperimentalWithRawCallbackMethod_GetPersonsLimit : public BaseClass template class ExperimentalWithRawCallbackMethod_GetPersons : public BaseClass

(13) 内部模板类, WithStreamedUnaryMethod_AddPerson, WithSplitStreamingMethod_GetPersonsLimit:
template class WithStreamedUnaryMethod_AddPerson : public BaseClass template class WithSplitStreamingMethod_GetPersonsLimit : public BaseClass

(14) 额外类型的服务定义:
typedef WithStreamedUnaryMethod_AddPerson StreamedUnaryService; typedef WithSplitStreamingMethod_GetPersonsLimit SplitStreamedService; typedef WithStreamedUnaryMethod_AddPerson StreamedService;

关于生成文件的讲解, 就差不多这些了, 有空应该讲一下grpc内部调用的逻辑.

下面给出服务端重载proto的Manage服务的代码:
#include #include #include #include #include #include #include "person.grpc.pb.h"class PersonManager { public: explicit PersonManager() { }// AddPerson 用来添加一个人 bool AddPerson(const tutorial::Person& p) { m_persons.insert(p.name(), p); return true; }// GetPerson 用来查找一个人 tutorial::Person GetPerson(const std::string& name) const { return m_persons.at(name); }// GetPersons 用来获取多个人 std::vector GetPersons(int num) const { std::vector personList; auto it = m_persons.begin(); while (it != m_persons.end()) { if (static_cast(personList.size()) > num) { return personList; } personList.push_back(it->second); ++it; } return personList; }private: folly::ConcurrentHashMapstring, tutorial::Person> m_persons; }; class PersonService : public tutorial::Manage::Service { public: explicit PersonService() { }// AddPerson 用来添加一个人 grpc::Status AddPerson(grpc::ServerContext* context, const tutorial::Person *person, tutorial::Result *res) override { m_mgr.AddPerson(*person); res->set_success(true); return grpc::Status::OK; }// AddPersons 用来添加多个用户 grpc::Status AddPersons(grpc::ServerContext* context, grpc::ServerReader* reader, tutorial::Result *res) override { tutorial::Person person; while (reader->Read(&person)) { m_mgr.AddPerson(person); } res->set_success(true); return grpc::Status::OK; }// GetPersonsLimit 用来查询一个人 grpc::Status GetPersonsLimit(grpc::ServerContext* context, const tutorial::ReqNum *num, grpc::ServerWriter* writer) override { auto persons = m_mgr.GetPersons(num->num()); for (const auto& person : persons) { writer->Write(person); } return grpc::Status::OK; }// GetPersons 用来根据人名获取所有的人 grpc::Status GetPersons(grpc::ServerContext *context, grpc::ServerReaderWriter* stream) override { tutorial::ReqName name; while (stream->Read(&name)) { try { stream->Write(m_mgr.GetPerson(name.name())); } catch (const std::out_of_range& ex) { // 如果出现越界的问题, 则说明不存在 } } return grpc::Status::OK; }private: PersonManager m_mgr; };

下面给出创建grpc服务器的代码:
#include #include "person_manage.h"// maxThreadNum 根据计算机硬件设置 const int maxThreadNum = 20; void RunServer() { std::string server_address("localhost:50001"); PersonService service; grpc::ServerBuilder builder; grpc::ResourceQuota quota; quota.SetMaxThreads(maxThreadNum); builder.SetResourceQuota(quota); builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl; server->Wait(); }int main(int argc, char** argv) { RunServer(); return 0; }

下面给出客户端对proto中的Manage服务的封装代码:
#include #include #include #include #include #include #include #include #include "person.grpc.pb.h"class PersonManip { public: PersonManip(std::shared_ptr channel) : m_stub(tutorial::Manage::NewStub(channel)) { }// 添加一个用户 bool AddPerson(const tutorial::Person& person) { grpc::ClientContext context; tutorial::Result res; grpc::Status status = m_stub->AddPerson(&context, person, &res); if (!status.ok()) { std::cout << "status error: " << status.error_message() << std::endl; return false; } return res.success(); }// 添加多个用户, 当前的服务端实现可能造成部分插入的情况 bool AddPersons(const std::vector& persons) { grpc::ClientContext context; tutorial::Result res; std::unique_ptr> writer( m_stub->AddPersons(&context, &res)); for (const auto& person : persons) { if (!writer->Write(person)) { // Broken stream. break; } } writer->WritesDone(); grpc::Status status = writer->Finish(); if (!status.ok()) { std::cout << "status error: " << status.error_message() << std::endl; return false; }return res.success(); }// 获取限定数目的用户 bool GetPersonsLimit(int limitNum, std::vector& persons) { grpc::ClientContext context; tutorial::ReqNum limit; limit.set_num(limitNum); std::unique_ptr> reader( m_stub->GetPersonsLimit(&context, limit)); tutorial::Person person; while (reader->Read(&person)) { persons.push_back(person); } grpc::Status status = reader->Finish(); if (!status.ok()) { std::cout << "status error: " << status.error_message() << std::endl; return false; }return true; }// 获取所有指定名字的用户 bool GetPersons(const std::vectorstring>& personNames, std::vector& persons) { grpc::ClientContext context; std::shared_ptr> stream( m_stub->GetPersons(&context)); std::thread writer([stream, &personNames]() { for (const auto& personName : personNames) { tutorial::ReqName name; name.set_name(personName); stream->Write(name); } stream->WritesDone(); }); tutorial::Person person; while (stream->Read(&person)) { persons.push_back(person); } writer.join(); grpc::Status status = stream->Finish(); if (!status.ok()) { std::cout << "status error: " << status.error_message() << std::endl; return false; }return true; }private: std::unique_ptr m_stub; };

下面给出客户端测试的代码:
#include "person_manip.h"tutorial::Person makePerson(const std::string& name, int id, const std::string& email) { tutorial::Person person; person.set_name(name); person.set_id(id); person.set_email(email); return person; }void printPersons(const std::vector& persons) { for (const auto& p : persons) { std::cout << "name: " << p.name() << " " << "id: " << p.id() << " " << "email: " << p.email() << std::endl; } std::cout << std::endl; }int main(int argc, char **argv) { PersonManip manip( grpc::CreateChannel("localhost:50001", grpc::InsecureChannelCredentials())); auto person = makePerson("Tom", 1, "tom@gmail.com"); auto suc = manip.AddPerson(person); if (!suc) { std::cout << "manip.AddPerson failed." << std::endl; return -1; }person = makePerson("Lilly", 2, "lilly@gmail.com"); auto person2 = makePerson("Jim", 3, "jim@gmail.com"); std::vector persons{person, person2}; suc = manip.AddPersons(persons); if (!suc) { std::cout << "manip.AddPersons failed." << std::endl; return -1; }std::vector resPersons; suc = manip.GetPersonsLimit(5, resPersons); if (!suc) { std::cout << "manip.GetPersonsLimit failed." << std::endl; return -1; } std::cout << "manip.GetPersonsLimit output:" << std::endl; printPersons(resPersons); resPersons.clear(); std::vectorstring> personNames; for (const auto& p : persons) { personNames.push_back(p.name()); } suc = manip.GetPersons(personNames, resPersons); if (!suc) { std::cout << "manip.GetPersons failed." << std::endl; return -1; } std::cout << "manip.GetPersons output:" << std::endl; printPersons(resPersons); return 0; }

这个我没有使用单元测试, 可能使用单元测试会更好, 不过根据客户端代码和输出, 也可以验证服务的正确性.
完整的代码参考: https://github.com/ss-torres/person-service.git
如果有什么建议或者提议, 欢迎提出

【grpc的简单用例|grpc的简单用例 (C++实现)】转载于:https://www.cnblogs.com/albizzia/p/10830878.html

    推荐阅读