grpc的简单用例|grpc的简单用例 (golang实现)

这个用例的逻辑很简单, 服务器运行一个管理个人信息的服务, 提供如下的四个服务:
(1) 添加一个个人信息
注: 对应于Unary RPCs, 客户端发送单一消息给服务器, 服务器返回单一消息
(2) 添加多个个人信息
注: 对应于Client streaming RPCs, 客户端使用提供的stream发送多个消息给服务端, 等客户端写完了所有的消息, 就会等待服务器读取这些消息, 然后返回响应消息. gRPC保证在一次RPC调用中, 消息是顺序的.
(3) 获取最多N个个人信息
注: 对应于Server streaming RPCs, 客户端发送一条消息给服务端, 然后获取一个stream来读取一系列的返回消息. 客户端会一直读取消息, 知道没有消息可读为止, gRPC保证在一次RPC调用中,消息是顺序的.
(4) 获取指定名字的所有个人信息
注: 对应于Bidirectional streaming RPCs, 这种rcp, 客户端和服务端通过一个read-write stream来发送一系列的消息. 这两个消息流可以独立操作, 就是说, 客户端和服务端可以以任意它们所想的顺序操作这两个消息流. 例如, 服务器可以等待接收到所有的客户端消息时,才开始向客户端发送消息, 或者它可以读一条消息, 然后给客户端发送一条消息, 或者别的想要的方式.在两个消息流的其中一个中, 消息是顺序的.

在给出代码之前, 先说明一件事, 在grpc中, 请求参数和返回值类型都需要是message类型, 而不能是string, int32等类型.下面给出proto文件的定义:

// [START declaration] syntax = "proto3"; package tutorial; import "google/protobuf/timestamp.proto"; // [END declaration]// [START messages] message Person { string name = 1; int32 id = 2; // Unique ID number for this person. string email = 3; enum PhoneType { MOBILE = 0; HOME = 1; WORK = 2; }message PhoneNumber { string number = 1; PhoneType type = 2; }repeated PhoneNumber phones = 4; google.protobuf.Timestamp last_updated = 5; }// Our address book file is just one of these. message AddressBook { repeated Person people = 1; }// rpc调用的结果 message Result { bool success = 1; }// rpc请求的个数 message ReqNum { int32 num = 1; }message ReqName { string name = 1; }// [END messages]// Interface exported by the server. service Manage { // 添加一个人 rpc AddPerson(Person) returns (Result) {} // 添加很多人 rpc AddPersons(stream Person) returns (Result) {} // 获取指定数目的个人列表 rpc GetPersonsLimit(ReqNum) returns (stream Person) {} // 获取名字为输入的个人列表 rpc GetPersons(stream ReqName) returns (stream Person) {} }

Person的定义和之前的protobuf中一致, 新加了一些用于grpc调用的结构体, 这些结构体很简单, 就不讲了. service Manage中定义的是这个服务提供的rpc调用接口.
(1) 添加一个个人信息 对应的是AddPerson
(2) 添加多个个人信息 对应的是 AddPersons
(3) 获取最多N个个人信息 对应的是 GetPersonsLimit
(4) 获取指定名字的所有个人信息 对应的是 GetPersons
rpc定义很直观, 应该可以参照写出需要的rpc, 按照我了解的, 每个rpc有一个输入参数和一个输出参数, 这个需要注意.
下面给出服务端实现proto的Manage服务的代码:
package mainimport ( "context" "fmt" "io" "log" "net" "sync" pb "personservice/tutorial" "google.golang.org/grpc" )// 个人信息服务端 type personServer struct { persons sync.Map }// AddPerson 添加一个个人信息 func (s *personServer) AddPerson(ctx context.Context, person *pb.Person) (*pb.Result, error) { s.persons.LoadOrStore(person.Name, person) return &pb.Result{ Success: true, }, nil }// AddPersons 添加多个个人信息 func (s *personServer) AddPersons(stream pb.Manage_AddPersonsServer) error { for { person, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.Result{ Success: true, }) }if err != nil { return err }s.persons.LoadOrStore(person.Name, person) } }// GetPersonsLimit 获取限定数目的个人信息 func (s *personServer) GetPersonsLimit(limitNum *pb.ReqNum, stream pb.Manage_GetPersonsLimitServer) error { var err error var i int32 s.persons.Range(func(key, value interface{}) bool { person, ok := value.(*pb.Person) if !ok { return false } err = stream.Send(person) if err != nil { return false } i++ if i >= (limitNum.Num) { return false } return true }) return err }// GetPersons 获取给定名字的所有个人信息 func (s *personServer) GetPersons(stream pb.Manage_GetPersonsServer) error { for { in, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } value, ok := s.persons.Load(in.Name) if !ok { continue } person, ok := value.(*pb.Person) if !ok { continue } err = stream.Send(person) if err != nil { return err } } }func newServer() *personServer { s := &personServer{} return s }func main() { address := "localhost:50001" lis, err := net.Listen("tcp", address) if err != nil { log.Fatalf("failed to listen: %v", err) } var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) pb.RegisterManageServer(grpcServer, newServer()) fmt.Println("Server listening on:", address) grpcServer.Serve(lis) }

下面代码实现了客户端对Manage服务的rpc调用:
package mainimport ( "context" "fmt" "io" "log" pb "personservice/tutorial" "time" "google.golang.org/grpc" )const ( rpcTimeOut = 10 )// addPerson 用于添加个人信息 func addPerson(client pb.ManageClient, person *pb.Person) bool { ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second) defer cancel() res, err := client.AddPerson(ctx, person) if err != nil { log.Printf("client.AddPerson failed, error: %v\n", err) return false } return res.Success}// addPersons 用来添加多个个人信息 func addPersons(client pb.ManageClient, persons []*pb.Person) bool { ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second) defer cancel() stream, err := client.AddPersons(ctx) if err != nil { log.Printf("client.AddPersons failed, error: %v\n", err) return false } for _, person := range persons { if err := stream.Send(person); err != nil { log.Printf("stream.Send failed, error: %v\n", err) return false } } res, err := stream.CloseAndRecv() if err != nil { log.Printf("stream.CloseAndRecv failed, error: %v\n", err) return false } return res.Success }// getPersonsLimit 用来获取指定数目的个人信息 func getPersonsLimit(client pb.ManageClient, limitNum int32) ([]*pb.Person, error) { var persons []*pb.Person ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second) defer cancel() num := pb.ReqNum{ Num: limitNum, } stream, err := client.GetPersonsLimit(ctx, &num) if err != nil { log.Printf("client.GetPersonsLimit failed, error: %v\n", err) return persons, err } for { person, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Printf("stream.Recv failed, error: %v\n", err) return persons, err } persons = append(persons, person) } return persons, nil }// getPersons 用来获取指定名字的所有个人信息 func getPersons(client pb.ManageClient, personNames []string) ([]*pb.Person, error) { ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second) defer cancel() stream, err := client.GetPersons(ctx) if err != nil { log.Printf("client.GetPersons failed, error: %v\n", err) return nil, err } waitc := make(chan struct{}) // 发送个人名字信息 go func() { for _, personName := range personNames { name := pb.ReqName{ Name: personName, } if err := stream.Send(&name); err != nil { log.Printf("stream.Send failed, error: %v\n", err) break } } err := stream.CloseSend() if err != nil { log.Printf("stream.CloseSend failed, error: %v\n", err) } close(waitc) }() // 获取对应的所有个人信息 var persons []*pb.Person var in *pb.Person for { in, err = stream.Recv() if err != nil { break } persons = append(persons, in) } <-waitc // 检查读取结果, err应该不会为nil if err == io.EOF || err == nil { return persons, nil } log.Fatalf("stream.Recv failed, error: %v\n", err) return persons, err }func makePerson(name string, id int32, email string) pb.Person { return pb.Person{ Name:name, Id:id, Email: email, } }func printPersons(persons []*pb.Person) { for _, person := range persons { fmt.Printf("%+v\n", person) } fmt.Println("") }func main() { var opts []grpc.DialOption opts = append(opts, grpc.WithInsecure()) conn, err := grpc.Dial("localhost:50001", opts...) if err != nil { log.Fatalf("grpc.Dial failed, error: %v\n", err) } defer conn.Close() client := pb.NewManageClient(conn) person := makePerson("Tom", 1, "tom@gmail.com") suc := addPerson(client, &person) if !suc { log.Fatalf("addPerson failed.\n") } person = makePerson("Lilly", 2, "lilly@gmail.com") person2 := makePerson("Jim", 3, "jim@gmail.com") persons := []*pb.Person{&person, &person2} suc = addPersons(client, persons) if !suc { log.Fatalf("addPersons failed.\n") } resPersons, err := getPersonsLimit(client, 5) if err != nil { log.Fatalf("getPersonsLimit failed, error: %v\n", err) } fmt.Println("getPersonsLimit output:") printPersons(resPersons) var personNames []string for _, person := range persons { personNames = append(personNames, person.GetName()) } resPersons, err = getPersons(client, personNames) if err != nil { log.Fatalf("getPersons failed, error: %v\n", err) } fmt.Println("getPersons output:") printPersons(resPersons) }


这个我没有使用单元测试, 可能使用单元测试会更好, 不过根据客户端代码和输出, 也可以验证服务的正确性.
完整的代码参考: https://github.com/ss-torres/personservice.git
如果有什么建议或者提议, 欢迎提出
【grpc的简单用例|grpc的简单用例 (golang实现)】转载于:https://www.cnblogs.com/albizzia/p/10836948.html

    推荐阅读