Golang|go-kit实践之1(go-kit 与 grpc 结合开发微服务(go-kit endpoint的使用))

go-kit 是一个微服务的开发工具集,微服务系统中的大多数常见问题,因此,使用者可以将精力集中在业务逻辑上。
grpc缺乏服务治理的功能,我们可以通过go-kit结合grpc来实现我们的完整需求。go-kit抽象的endpoint设计让我们可以很容易包装其它微服务框架使用的协议。endpoint为Servers和Clients提供了基于RPC方法的构建模块。
【Golang|go-kit实践之1(go-kit 与 grpc 结合开发微服务(go-kit endpoint的使用))】先来看看endpoint源码:

package endpointimport ( "context" )// Endpoint is the fundamental building block of servers and clients. // It represents a single RPC method. type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error)// Nop is an endpoint that does nothing and returns a nil error. // Useful for tests. func Nop(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }// Middleware is a chainable behavior modifier for endpoints. type Middleware func(Endpoint) Endpoint// Chain is a helper function for composing middlewares. Requests will // traverse them in the order they're declared. That is, the first middleware // is treated as the outermost middleware. func Chain(outer Middleware, others ...Middleware) Middleware { return func(next Endpoint) Endpoint { for i := len(others) - 1; i >= 0; i-- { // reverse next = others[i](next) } return outer(next) } }// Failer may be implemented by Go kit response types that contain business // logic error details. If Failed returns a non-nil error, the Go kit transport // layer may interpret this as a business logic error, and may encode it // differently than a regular, successful response. // // It's not necessary for your response types to implement Failer, but it may // help for more sophisticated use cases. The addsvc example shows how Failer // should be used by a complete application. type Failer interface { Failed() error }

go-kit提供以下功能:
1、Circuit breaker(熔断器)
2、Rate limiter(限流器)
3、Logging(日志)
4、Metrics(Prometheus统计)
5、Request tracing(请求跟踪)
6、Service discovery and load balancing(服务发现和负载均衡)
1、安装go-kit包
git clone https://github.com/go-kit/kit.git

放入到对应的gopath目录下。
2、protobuf文件
syntax = "proto3"; // 请求书详情的参数结构book_id 32位整形 message BookInfoParams { int32 book_id = 1; }// 书详情信息的结构book_name字符串类型 message BookInfo { int32 book_id = 1; stringbook_name = 2; }// 请求书列表的参数结构page、limit32位整形 message BookListParams { int32 page = 1; int32 limit = 2; }// 书列表的结构BookInfo结构数组 message BookList { repeated BookInfo book_list = 1; } // 定义 获取书详情和 书列表服务入参出参分别为上面所定义的结构 service BookService { rpc GetBookInfo (BookInfoParams) returns (BookInfo) {} rpc GetBookList (BookListParams) returns (BookList) {} }

生成对应的go语言代码文件:protoc --go_out=plugins=grpc:. book.proto(其中:protobuf文件名为:book.proto),生成的代码如下:
// Code generated by protoc-gen-go. DO NOT EDIT. // source: book.protopackage bookimport ( context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" math "math" )// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf// This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package// 请求书详情的参数结构book_id 32位整形 type BookInfoParams struct { BookIdint32`protobuf:"varint,1,opt,name=book_id,json=bookId,proto3" json:"book_id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized[]byte`json:"-"` XXX_sizecacheint32`json:"-"` }func (m *BookInfoParams) Reset(){ *m = BookInfoParams{} } func (m *BookInfoParams) String() string { return proto.CompactTextString(m) } func (*BookInfoParams) ProtoMessage(){} func (*BookInfoParams) Descriptor() ([]byte, []int) { return fileDescriptor_1e89d0eaa98dc5d8, []int{0} }func (m *BookInfoParams) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BookInfoParams.Unmarshal(m, b) } func (m *BookInfoParams) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_BookInfoParams.Marshal(b, m, deterministic) } func (m *BookInfoParams) XXX_Merge(src proto.Message) { xxx_messageInfo_BookInfoParams.Merge(m, src) } func (m *BookInfoParams) XXX_Size() int { return xxx_messageInfo_BookInfoParams.Size(m) } func (m *BookInfoParams) XXX_DiscardUnknown() { xxx_messageInfo_BookInfoParams.DiscardUnknown(m) }var xxx_messageInfo_BookInfoParams proto.InternalMessageInfofunc (m *BookInfoParams) GetBookId() int32 { if m != nil { return m.BookId } return 0 }// 书详情信息的结构book_name字符串类型 type BookInfo struct { BookIdint32`protobuf:"varint,1,opt,name=book_id,json=bookId,proto3" json:"book_id,omitempty"` BookNamestring`protobuf:"bytes,2,opt,name=book_name,json=bookName,proto3" json:"book_name,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized[]byte`json:"-"` XXX_sizecacheint32`json:"-"` }func (m *BookInfo) Reset(){ *m = BookInfo{} } func (m *BookInfo) String() string { return proto.CompactTextString(m) } func (*BookInfo) ProtoMessage(){} func (*BookInfo) Descriptor() ([]byte, []int) { return fileDescriptor_1e89d0eaa98dc5d8, []int{1} }func (m *BookInfo) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BookInfo.Unmarshal(m, b) } func (m *BookInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_BookInfo.Marshal(b, m, deterministic) } func (m *BookInfo) XXX_Merge(src proto.Message) { xxx_messageInfo_BookInfo.Merge(m, src) } func (m *BookInfo) XXX_Size() int { return xxx_messageInfo_BookInfo.Size(m) } func (m *BookInfo) XXX_DiscardUnknown() { xxx_messageInfo_BookInfo.DiscardUnknown(m) }var xxx_messageInfo_BookInfo proto.InternalMessageInfofunc (m *BookInfo) GetBookId() int32 { if m != nil { return m.BookId } return 0 }func (m *BookInfo) GetBookName() string { if m != nil { return m.BookName } return "" }// 请求书列表的参数结构page、limit32位整形 type BookListParams struct { Pageint32`protobuf:"varint,1,opt,name=page,proto3" json:"page,omitempty"` Limitint32`protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized[]byte`json:"-"` XXX_sizecacheint32`json:"-"` }func (m *BookListParams) Reset(){ *m = BookListParams{} } func (m *BookListParams) String() string { return proto.CompactTextString(m) } func (*BookListParams) ProtoMessage(){} func (*BookListParams) Descriptor() ([]byte, []int) { return fileDescriptor_1e89d0eaa98dc5d8, []int{2} }func (m *BookListParams) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BookListParams.Unmarshal(m, b) } func (m *BookListParams) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_BookListParams.Marshal(b, m, deterministic) } func (m *BookListParams) XXX_Merge(src proto.Message) { xxx_messageInfo_BookListParams.Merge(m, src) } func (m *BookListParams) XXX_Size() int { return xxx_messageInfo_BookListParams.Size(m) } func (m *BookListParams) XXX_DiscardUnknown() { xxx_messageInfo_BookListParams.DiscardUnknown(m) }var xxx_messageInfo_BookListParams proto.InternalMessageInfofunc (m *BookListParams) GetPage() int32 { if m != nil { return m.Page } return 0 }func (m *BookListParams) GetLimit() int32 { if m != nil { return m.Limit } return 0 }// 书列表的结构BookInfo结构数组 type BookList struct { BookList[]*BookInfo `protobuf:"bytes,1,rep,name=book_list,json=bookList,proto3" json:"book_list,omitempty"` XXX_NoUnkeyedLiteral struct{}`json:"-"` XXX_unrecognized[]byte`json:"-"` XXX_sizecacheint32`json:"-"` }func (m *BookList) Reset(){ *m = BookList{} } func (m *BookList) String() string { return proto.CompactTextString(m) } func (*BookList) ProtoMessage(){} func (*BookList) Descriptor() ([]byte, []int) { return fileDescriptor_1e89d0eaa98dc5d8, []int{3} }func (m *BookList) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BookList.Unmarshal(m, b) } func (m *BookList) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_BookList.Marshal(b, m, deterministic) } func (m *BookList) XXX_Merge(src proto.Message) { xxx_messageInfo_BookList.Merge(m, src) } func (m *BookList) XXX_Size() int { return xxx_messageInfo_BookList.Size(m) } func (m *BookList) XXX_DiscardUnknown() { xxx_messageInfo_BookList.DiscardUnknown(m) }var xxx_messageInfo_BookList proto.InternalMessageInfofunc (m *BookList) GetBookList() []*BookInfo { if m != nil { return m.BookList } return nil }func init() { proto.RegisterType((*BookInfoParams)(nil), "BookInfoParams") proto.RegisterType((*BookInfo)(nil), "BookInfo") proto.RegisterType((*BookListParams)(nil), "BookListParams") proto.RegisterType((*BookList)(nil), "BookList") }func init() { proto.RegisterFile("book.proto", fileDescriptor_1e89d0eaa98dc5d8) }var fileDescriptor_1e89d0eaa98dc5d8 = []byte{ // 224 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4a, 0xca, 0xcf, 0xcf, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0xd2, 0xe4, 0xe2, 0x73, 0xca, 0xcf, 0xcf, 0xf6, 0xcc, 0x4b, 0xcb, 0x0f, 0x48, 0x2c, 0x4a, 0xcc, 0x2d, 0x16, 0x12, 0xe7, 0x62, 0x07, 0xc9, 0xc7, 0x67, 0xa6, 0x48, 0x30, 0x2a, 0x30, 0x6a, 0xb0, 0x06, 0xb1, 0x81, 0xb8, 0x9e, 0x29, 0x4a, 0x0e, 0x5c, 0x1c, 0x30, 0xa5, 0x38, 0x15, 0x09, 0x49, 0x73, 0x71, 0x82, 0x25, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x38, 0x40, 0x02, 0x7e, 0x89, 0xb9, 0xa9, 0x4a, 0x56, 0x10, 0xcb, 0x7c, 0x32, 0x8b, 0x4b, 0xa0, 0x96, 0x09, 0x71, 0xb1, 0x14, 0x24, 0xa6, 0xa7, 0x42, 0x0d, 0x01, 0xb3, 0x85, 0x44, 0xb8, 0x58, 0x73, 0x32, 0x73, 0x33, 0x4b, 0xc0, 0xda, 0x59, 0x83, 0x20, 0x1c, 0x25, 0x23, 0x88, 0xed, 0x20, 0xbd, 0x42, 0x6a, 0x50, 0x4b, 0x72, 0x32, 0x8b, 0x4b, 0x24, 0x18, 0x15, 0x98, 0x35, 0xb8, 0x8d, 0x38, 0xf5, 0x60, 0x6e, 0x83, 0xd8, 0x07, 0x52, 0x67, 0x94, 0xce, 0xc5, 0x0d, 0x12, 0x0d, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x15, 0xd2, 0xe6, 0xe2, 0x76, 0x4f, 0x2d, 0x81, 0xfb, 0x81, 0x5f, 0x0f, 0xd5, 0xe7, 0x52, 0x08, 0x33, 0x94, 0x18, 0x90, 0x14, 0x83, 0xad, 0x84, 0x28, 0x46, 0xb8, 0x1c, 0xaa, 0x18, 0x24, 0xa0, 0xc4, 0x90, 0xc4, 0x06, 0x0e, 0x4c, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x3e, 0x22, 0x2f, 0xab, 0x5a, 0x01, 0x00, 0x00, }// Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConn// This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4// BookServiceClient is the client API for BookService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type BookServiceClient interface { GetBookInfo(ctx context.Context, in *BookInfoParams, opts ...grpc.CallOption) (*BookInfo, error) GetBookList(ctx context.Context, in *BookListParams, opts ...grpc.CallOption) (*BookList, error) }type bookServiceClient struct { cc *grpc.ClientConn }func NewBookServiceClient(cc *grpc.ClientConn) BookServiceClient { return &bookServiceClient{cc} }func (c *bookServiceClient) GetBookInfo(ctx context.Context, in *BookInfoParams, opts ...grpc.CallOption) (*BookInfo, error) { out := new(BookInfo) err := c.cc.Invoke(ctx, "/BookService/GetBookInfo", in, out, opts...) if err != nil { return nil, err } return out, nil }func (c *bookServiceClient) GetBookList(ctx context.Context, in *BookListParams, opts ...grpc.CallOption) (*BookList, error) { out := new(BookList) err := c.cc.Invoke(ctx, "/BookService/GetBookList", in, out, opts...) if err != nil { return nil, err } return out, nil }// BookServiceServer is the server API for BookService service. type BookServiceServer interface { GetBookInfo(context.Context, *BookInfoParams) (*BookInfo, error) GetBookList(context.Context, *BookListParams) (*BookList, error) }// UnimplementedBookServiceServer can be embedded to have forward compatible implementations. type UnimplementedBookServiceServer struct { }func (*UnimplementedBookServiceServer) GetBookInfo(ctx context.Context, req *BookInfoParams) (*BookInfo, error) { return nil, status.Errorf(codes.Unimplemented, "method GetBookInfo not implemented") } func (*UnimplementedBookServiceServer) GetBookList(ctx context.Context, req *BookListParams) (*BookList, error) { return nil, status.Errorf(codes.Unimplemented, "method GetBookList not implemented") }func RegisterBookServiceServer(s *grpc.Server, srv BookServiceServer) { s.RegisterService(&_BookService_serviceDesc, srv) }func _BookService_GetBookInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(BookInfoParams) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(BookServiceServer).GetBookInfo(ctx, in) } info := &grpc.UnaryServerInfo{ Server:srv, FullMethod: "/BookService/GetBookInfo", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(BookServiceServer).GetBookInfo(ctx, req.(*BookInfoParams)) } return interceptor(ctx, in, info, handler) }func _BookService_GetBookList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(BookListParams) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(BookServiceServer).GetBookList(ctx, in) } info := &grpc.UnaryServerInfo{ Server:srv, FullMethod: "/BookService/GetBookList", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(BookServiceServer).GetBookList(ctx, req.(*BookListParams)) } return interceptor(ctx, in, info, handler) }var _BookService_serviceDesc = grpc.ServiceDesc{ ServiceName: "BookService", HandlerType: (*BookServiceServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "GetBookInfo", Handler:_BookService_GetBookInfo_Handler, }, { MethodName: "GetBookList", Handler:_BookService_GetBookList_Handler, }, }, Streams:[]grpc.StreamDesc{}, Metadata: "book.proto", }

3、服务器端代码
package mainimport ( "MyKit" //import book.pb.go(protoc 生成的文件) "context" "github.com/go-kit/kit/endpoint" grpc_transport "github.com/go-kit/kit/transport/grpc" "google.golang.org/grpc" "net" )type BookServer struct { bookListHandler grpc_transport.Handler bookInfoHandler grpc_transport.Handler }//通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理 func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) { _, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in) if err != nil { return nil, err } return rsp.(*book.BookInfo), err }//通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理 func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) { _, rsp, err := s.bookListHandler.ServeGRPC(ctx, in) if err != nil { return nil, err } return rsp.(*book.BookList), err }//创建bookList的EndPoint func makeGetBookListEndpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { //请求列表时返回 书籍列表 bl := new(book.BookList) bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 1, BookName: "Go入门到精通"}) bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 2, BookName: "go-kit入门到精通"}) bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 2, BookName: "go-micro入门到精通"}) return bl, nil } }//创建bookInfo的EndPoint func makeGetBookInfoEndpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { //请求详情时返回 书籍信息 req := request.(*book.BookInfoParams) b := new(book.BookInfo) b.BookId = req.BookId b.BookName = "Go与微服务" return b, nil } }func decodeRequest(_ context.Context, req interface{}) (interface{}, error) { return req, nil }func encodeResponse(_ context.Context, req interface{}) (interface{}, error) { return req, nil }func main() { //包装BookServer bookServer := new(BookServer) //创建bookList的Handler bookListHandler := grpc_transport.NewServer( makeGetBookListEndpoint(), decodeRequest, encodeResponse, ) //bookServer 增加 go-kit流程的 bookList处理逻辑 bookServer.bookListHandler = bookListHandler //创建bookInfo的Handler bookInfoHandler := grpc_transport.NewServer( makeGetBookInfoEndpoint(), decodeRequest, encodeResponse, ) //bookServer 增加 go-kit流程的 bookInfo处理逻辑 bookServer.bookInfoHandler = bookInfoHandler //启动grpc服务 serviceAddress := ":50052" ls, _ := net.Listen("tcp", serviceAddress) gs := grpc.NewServer() book.RegisterBookServiceServer(gs, bookServer) gs.Serve(ls) } //Todo go-kit参考:https://blog.csdn.net/lihao19910921/article/details/80166399

4、客户端代码
package mainimport ( "MyKit" "context" "fmt" "google.golang.org/grpc" )func main() { serviceAddress := "127.0.0.1:50052" conn, err := grpc.Dial(serviceAddress, grpc.WithInsecure()) if err != nil { panic("connect error") } defer conn.Close() bookClient := book.NewBookServiceClient(conn) bi, _ := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1}) fmt.Print("获取书籍信息:\t") fmt.Println("bookInfo:", bi.BookName) bl, _ := bookClient.GetBookList(context.Background(), &book.BookListParams{Page: 1, Limit: 10}) fmt.Println("获取书籍列表:\t") for _, b := range bl.BookList { fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName) } }

5、运行
首先启动Server端,处于监听阻塞状态。再运行client端:
Golang|go-kit实践之1(go-kit 与 grpc 结合开发微服务(go-kit endpoint的使用))
文章图片

6、说明
go-kit抽象的endpoint:
一个Transport的Server 必须要拥有endPoint、decodeRequestFunc、encodeResponseFunc
1、 endPoint一个端点代表一个RPC,也就是我们服务接口中的一个函数
2、 decodeRequestFunc 请求参数解码函数
3、 encodeResponseFunc 返回参数编码函数
请求流程:
请求->encodeResponseFunc -> endPoint -> decodeRequestFunc -> 返回输出
注:安装go-kit环境依赖内容比较多,有的时候因为网络问题(一般要访问外网),因此需要利用GitHub下载之后移动到对应的gopath中(具体问题只能结合自己的经验与百度了 O(∩_∩)O)。

    推荐阅读