Golang|go-kit实践之5(go-kit微服务请求跟踪实现)

一、介绍 go-kit 提供了两种tracing请求跟踪
1、opentracing【跟踪标准】
2、zipkin【zipkin的go封装】
我们下面来介绍下zipkin在go-kit中的使用方法。
二、zipkin安装启动 1、ZipKin入门介绍
Zipkin是一款开源的分布式实时数据追踪系统(Distributed Tracking System),基于 Google Dapper的论文设计而来,由 Twitter 公司开发贡献。其主要功能是聚集来自各个异构系统的实时监控数据。分布式跟踪系统还有其他比较成熟的实现,例如:Naver的Pinpoint、Apache的HTrace、阿里的鹰眼Tracing、京东的Hydra、新浪的Watchman,美团点评的CAT,skywalking等。
2、ZipKin架构
ZipKin可以分为两部分,一部分是zipkin server,用来作为数据的采集存储、数据分析与展示;zipkin client是zipkin基于不同的语言及框架封装的一些列客户端工具,这些工具完成了追踪数据的生成与上报功能,架构如下:
Golang|go-kit实践之5(go-kit微服务请求跟踪实现)
文章图片

Zipkin Server主要包括四个模块:
(1)Collector 接收或收集各应用传输的数据
(2)Storage 存储接受或收集过来的数据,当前支持Memory,MySQL,Cassandra,ElasticSearch等,默认存储在内存中。
(3)API(Query) 负责查询Storage中存储的数据,提供简单的JSON API获取数据,主要提供给web UI使用
(4)Web 提供简单的web界面
服务追踪流程如下:

┌─────────────┐ ┌───────────────────────┐┌─────────────┐┌──────────────────┐ │ User Code│ │ Trace Instrumentation ││ Http Client ││ Zipkin Collector │ └─────────────┘ └───────────────────────┘└─────────────┘└──────────────────┘ ││││ ┌─────────┐ │ ──┤GET /foo ├─? │ ────┐││ └─────────┘│ record tags ││ ?───┘││ ────┐ │││ add trace headers ││ ?───┘ ││ ────┐││ │ record timestamp ││ ?───┘││ ┌─────────────────┐ ││ ──┤GET /foo├─? ││ │X-B3-TraceId: aa │────┐ │││X-B3-SpanId: 6b││││ └─────────────────┘│ invoke ││││ request│ │ │││││ ┌────────┐?───┘ ││ ?─────┤200 OK├─────── ││ ────┐ └────────┘ │││ record duration││ ┌────────┐?───┘ │ ?──┤200 OK├── │││ └────────┘┌────────────────────────────────┐ ││ ──┤ asynchronously report span├────? │ ││ │{│ │"traceId": "aa",│ │"id": "6b",│ │"name": "get",│ │"timestamp": 1483945573944000,│ │"duration": 386000,│ │"annotations": [│ │--snip--│ └────────────────────────────────┘

Instrumented client和server是分别使用了ZipKin Client的服务,Zipkin Client会根据配置将追踪数据发送到Zipkin Server中进行数据存储、分析和展示。
3、ZipKin几个概念
在追踪日志中,有几个基本概念spanId、traceId、parentId
traceId:用来确定一个追踪链的16字符长度的字符串,在某个追踪链中保持不变。
spanId:区域Id,在一个追踪链中spanId可能存在多个,每个spanId用于表明在某个服务中的身份,也是16字符长度的字符串。
parentId:在跨服务调用者的spanId会传递给被调用者,被调用者会将调用者的spanId作为自己的parentId,然后自己再生成spanId。
Golang|go-kit实践之5(go-kit微服务请求跟踪实现)
文章图片

4、window安装zipkin
(1)下载:
下载地址:https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec
(2)运行:
进入到下载的文件路径下,执行:java -jar xxx(注:xxx:为下载的文件名,譬如下载的版本为:zipkin-server-2.12.8-exec.jar,则执行:java -jar zipkin-server-2.12.8-exec.jar 。特别说明:Java-jar 需要安装jdk)
(3)访问:zipkin Server 运行后默认的访问地址:http://localhost:9411
Golang|go-kit实践之5(go-kit微服务请求跟踪实现)
文章图片

(4)调用链分析
Golang|go-kit实践之5(go-kit微服务请求跟踪实现)
文章图片

三、go-kit的zipkin 1、核心代码说明
服务端trace:
//创建zipkin上报管理器 reporter := http.NewReporter("http://localhost:9411/api/v2/spans")//运行结束,关闭上报管理器的for-select协程 defer reporter.Close()//创建trace跟踪器 zkTracer, err := opzipkin.NewTracer(reporter)//添加grpc请求的before after finalizer 事件对应要处理的trace操作方法 zkServerTrace := zipkin.GRPCServerTrace(zkTracer)//通过options的方式运行trace bookListHandler := grpctransport.NewServer( bookListEndPoint, decodeRequest, encodeResponse, zkServerTrace, )

客户端trance:
与服务端trace的区别在于kitzipkin.GRPCClientTrace
reporter := http.NewReporter("http://localhost:9411/api/v2/spans") defer reporter.Close()zkTracer, err := opzipkin.NewTracer(reporter) zkClientTrace := zipkin.GRPCClientTrace(zkTracer)

可以通过span组装span结构树
parentSpan := zkTracer.StartSpan("bookCaller") defer parentSpan.Flush() ctx = opzipkin.NewContext(context.Background(), parentSpan)

2、实例
1、protobuf文件及生成对应的go文件
syntax = "proto3"; // 请求书详情的参数结构book_id 32位整形 message BookInfoParams { int32 book_id = 1; } // 书详情信息的结构book_name字符串类型 message BookInfo { int32 book_id = 1; stringbook_name = 2; } // 请求书列表的参数结构page、limit32位整形 message BookListParams { int32 page = 1; int32 limit = 2; } // 书列表的结构BookInfo结构数组 message BookList { repeated BookInfo book_list = 1; } // 定义 获取书详情和 书列表服务入参出参分别为上面所定义的结构 service BookService { rpc GetBookInfo (BookInfoParams) returns (BookInfo) {} rpc GetBookList (BookListParams) returns (BookList) {} }

生成对应的go语言代码文件:protoc --go_out=plugins=grpc:. book.proto(其中:protobuf文件名为:book.proto)
2、Server端代码
package mainimport ( "MyKit" "context" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/sd/etcdv3" "github.com/go-kit/kit/tracing/zipkin" grpctransport "github.com/go-kit/kit/transport/grpc" opzipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/reporter/http" "golang.org/x/time/rate" "google.golang.org/grpc" "math/rand" "net" "time" )type BookServer struct { bookListHandler grpctransport.Handler bookInfoHandler grpctransport.Handler }//通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理 func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) { _, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in) if err != nil { return nil, err } return rsp.(*book.BookInfo), err }//通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理 func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) { _, rsp, err := s.bookListHandler.ServeGRPC(ctx, in) if err != nil { return nil, err } return rsp.(*book.BookList), err }//创建bookList的EndPoint func makeGetBookListEndpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { rand.Seed(time.Now().Unix()) randInt := rand.Int63n(200) time.Sleep(time.Duration(randInt) * time.Millisecond) //请求列表时返回 书籍列表 bl := new(book.BookList) bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 1, BookName: "Go入门到精通"}) bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 2, BookName: "微服务入门到精通"}) bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 2, BookName: "区块链入门到精通"}) return bl, nil } }//创建bookInfo的EndPoint func makeGetBookInfoEndpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { rand.Seed(time.Now().Unix()) randInt := rand.Int63n(200) time.Sleep(time.Duration(randInt) * time.Microsecond) //请求详情时返回 书籍信息 req := request.(*book.BookInfoParams) b := new(book.BookInfo) b.BookId = req.BookId b.BookName = "Go入门系列" return b, nil } }func decodeRequest(_ context.Context, req interface{}) (interface{}, error) { return req, nil }func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) { return rsp, nil }func main() { var ( //etcd服务地址 etcdServer = "127.0.0.1:2379" //服务的信息目录 prefix = "/services/book/" //当前启动服务实例的地址 instance = "127.0.0.1:50051" //服务实例注册的路径 key = prefix + instance //服务实例注册的val value = https://www.it610.com/article/instance ctx= context.Background() //服务监听地址 serviceAddress =":50051" ) //etcd的连接参数 options := etcdv3.ClientOptions{ DialTimeout:time.Second * 3, DialKeepAlive: time.Second * 3, } //创建etcd连接 client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options) if err != nil { panic(err) } // 创建注册器 registrar := etcdv3.NewRegistrar(client, etcdv3.Service{ Key:key, Value: value, }, log.NewNopLogger()) // 注册器启动注册 registrar.Register() //启动追踪 reporter := http.NewReporter("http://localhost:9411/api/v2/spans") //追踪地址 defer reporter.Close() zkTracer, err := opzipkin.NewTracer(reporter)//实例化追踪器 zkServerTrace := zipkin.GRPCServerTrace(zkTracer) //追踪器Server端 bookServer := new(BookServer) bookListEndPoint := makeGetBookListEndpoint() //创建限流器 1r/slimiter := rate.NewLimiter(rate.Every(time.Second * 1), 100000) //通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint limiter := rate.NewLimiter(rate.Every(time.Second*1), 1) //限流1秒,临牌数:1 bookListEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookListEndPoint) bookListHandler := grpctransport.NewServer( bookListEndPoint, decodeRequest, encodeResponse, zkServerTrace, //添加追踪 ) bookServer.bookListHandler = bookListHandler bookInfoEndPoint := makeGetBookInfoEndpoint() //通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint) bookInfoHandler := grpctransport.NewServer( bookInfoEndPoint, decodeRequest, encodeResponse, zkServerTrace, ) bookServer.bookInfoHandler = bookInfoHandler ls, _ := net.Listen("tcp", serviceAddress) gs := grpc.NewServer(grpc.UnaryInterceptor(grpctransport.Interceptor)) book.RegisterBookServiceServer(gs, bookServer) gs.Serve(ls) }

3、Client端代码
package mainimport ( "MyKit" "context" "fmt" "github.com/afex/hystrix-go/hystrix" "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" "github.com/go-kit/kit/sd/etcdv3" "github.com/go-kit/kit/sd/lb" "github.com/go-kit/kit/tracing/zipkin" grpctransport "github.com/go-kit/kit/transport/grpc" opzipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/reporter/http" "google.golang.org/grpc" "io" "time" )func main() { commandName := "my-endpoint" hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{ Timeout:1000 * 30, ErrorPercentThreshold:1, SleepWindow:10000, MaxConcurrentRequests:1000, RequestVolumeThreshold: 5, }) breakerMw := circuitbreaker.Hystrix(commandName) var ( //注册中心地址 etcdServer = "127.0.0.1:2379" //监听的服务前缀 prefix = "/services/book/" ctx= context.Background() ) options := etcdv3.ClientOptions{ DialTimeout:time.Second * 3, DialKeepAlive: time.Second * 3, } //连接注册中心 client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options) if err != nil { panic(err) } logger := log.NewNopLogger() //创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据 instancer, err := etcdv3.NewInstancer(client, prefix, logger) if err != nil { panic(err) } //创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint endpointer := sd.NewEndpointer(instancer, reqFactory, logger) //创建负载均衡器 balancer := lb.NewRoundRobin(endpointer) /** 我们可以通过负载均衡器直接获取请求的endPoint,发起请求 reqEndPoint,_ := balancer.Endpoint() */ /** 也可以通过retry定义尝试次数进行请求 */ reqEndPoint := lb.Retry(3, 100*time.Second, balancer) //增加熔断中间件 reqEndPoint = breakerMw(reqEndPoint) //现在我们可以通过 endPoint 发起请求了 req := struct{}{} for i := 1; i <= 30; i++ { if _, err = reqEndPoint(ctx, req); err != nil { fmt.Println(err) } } }//通过传入的 实例地址创建对应的请求endPoint func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) { return func(ctx context.Context, request interface{}) (interface{}, error) { fmt.Println("请求服务: ", instanceAddr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99")) conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure()) if err != nil { fmt.Println(err) panic("connect error") }//追踪设置 reporter := http.NewReporter("http://localhost:9411/api/v2/spans") //追踪地址 defer reporter.Close()zkTracer, err := opzipkin.NewTracer(reporter)//新建追踪器 zkClientTrace := zipkin.GRPCClientTrace(zkTracer) //启动追踪器Client端bookInfoRequest := grpctransport.NewClient( conn, "BookService", "GetBookInfo", func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil }, func(_ context.Context, out interface{}) (interface{}, error) { return out, nil }, book.BookInfo{}, zkClientTrace, //追踪客户端 ).Endpoint()bookListRequest := grpctransport.NewClient( conn, "BookService", "GetBookList", func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil }, func(_ context.Context, out interface{}) (interface{}, error) { return out, nil }, book.BookList{}, zkClientTrace, ).Endpoint()parentSpan := zkTracer.StartSpan("bookCaller") defer parentSpan.Flush()ctx = opzipkin.NewContext(ctx, parentSpan) infoRet, _ := bookInfoRequest(ctx, request) bi := infoRet.(*book.BookInfo) fmt.Println("获取书籍详情") fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)listRet, _ := bookListRequest(ctx, request) bl := listRet.(*book.BookList) fmt.Println("获取书籍列表") for _, b := range bl.BookList { fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName) }return nil, nil }, nil, nil }

3、运行
1、启动etcd
2、启动zipkin
3、运行Server端
4、运行Client端
Golang|go-kit实践之5(go-kit微服务请求跟踪实现)
文章图片

5、查看zipkin中的记录
访问http://localhost:9411/zipkin/
Golang|go-kit实践之5(go-kit微服务请求跟踪实现)
文章图片

之后就可以看到记录的数据。
Golang|go-kit实践之5(go-kit微服务请求跟踪实现)
文章图片

【Golang|go-kit实践之5(go-kit微服务请求跟踪实现)】

    推荐阅读