万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践

作者:翟东波、叶书俊
在微服务体系架构下,搜狐智能媒体使用 Zipkin 进行服务链路追踪(Tracing)的埋点采集,将采集的 Trace 信息存储到 StarRocks 中。通过 StarRocks 强大的 SQL 计算能力,对 Tracing 信息进行多维度的统计、分析等操作,提升了微服务监控能力,从简单统计的 Monitoring 上升到更多维度探索分析的 Observability。
全文主要分为三个部分:第一节主要介绍微服务下的常用监控方式,其中链路追踪技术,可以串联整个服务调用链路,获得整体服务的关键信息,对微服务的监控有非常重要的意义。第二节主要介绍搜狐智能媒体是如何构建链路追踪分析体系的,主要包括 Zipkin 的数据采集,StarRocks 的数据存储,以及根据应用场景对 StarRocks 进行分析计算等三个部分。第三节主要介绍搜狐智能媒体通过引入 Zipkin 和 StarRocks 进行链路追踪分析取得的一些实践效果。
01 微服务架构中的链路追踪 近年来,企业 IT 应用架构逐步向微服务、云原生等分布式应用架构演进,在搜狐智能媒体内部,应用服务按照微服务、Docker、Kubernetes、Spring Cloud 等架构思想和技术方案进行研发运维,提升部门整体工程效率。
微服务架构提升工程效率的同时,也带来了一些新的问题。微服务是一个分布式架构,它按业务划分服务单元,用户的每次请求不再是由某一个服务独立完成了,而是变成了多个服务一起配合完成。这些服务可能是由不同的团队、使用不同的编程语言实现,可能布在了不同的服务器、甚至不同的数据中心。如果用户请求出现了错误和异常,微服务分布式调用的特性决定了这些故障难以定位,相对于传统的单体架构,微服务监控面临着新的难题。
Logging、Metrics、Tracing
微服务监控可以包含很多方式,按照监测的数据类型主要划分为 Logging、Metrics 和Tracing 三大领域:
Logging 用户主动记录的离散事件,记录的信息一般是非结构化的文本内容,在用户进行问题分析判断时可以提供更为详尽的线索。
具有聚合属性的采集数据,旨在为用户展示某个指标在某个时段的运行状态,用于查看一些指标和趋势。
Tracing 记录一次请求调用的生命周期全过程,其中包括服务调用和处理时长等信息,含有请求上下文环境,由一个全局唯一的 Trace ID 来进行标识和串联整个调用链路,非常适合微服务架构的监控场景。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 1
三者的关系如上图所示,这三者之间也是有重叠的,比如 Logging 可以聚合相关字段生成 Metrics 信息,关联相关字段生成 Tracing 信息;Tracing 可以聚合查询次数生成 Metrics 信息,可以记录业务日志生成 Logging 信息。一般情况下要在 Metrics 和 Logging 中增加字段串联微服务请求调用生命周期比较困难,通过 Tracing 获取 Metrics 和 Logging 则相对容易很多。
另外,这三者对存储资源有着不同的需求,Metrics 是天然的压缩数据,最节省资源;Logging 倾向于无限增加的,甚至会超出预期的容量;Tracing 的存储容量,一般介于 Metrics 和 Logging 两者之间,另外还可通过采样率进一步控制容量需求。
从 Monitoring 到 Observability
Monitoring tells you whether the system works. Observability lets you ask why it's not working.
– Baron Schwarz
微服务监控从数据分析层次,可以简单分为 Monitoring 和 Observability。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

Monitoring 告诉你系统是否在工作,对已知场景的预定义计算,对各种监控问题的事前假设。对应上图 Known Knowns 和 Known Unknowns,都是事先假设可能会发生的事件,包括已经明白和不明白的事件。
Observability 可以让你询问系统为什么不工作,对未知场景的探索式分析,对任意监控问题的事后分析。对应上图 Unknown Knowns 和 Unknown Unknowns,都是事未察觉可能会发生的事件,包括已经明白和不明白的事件。
很显然,通过预先假设所有可能发生事件进行 Monitoring 的方式,已经不能满足微服务复杂的监控场景,我们需要能够提供探索式分析的 Observability 监控方式。在 Logging、Metrics 和 Tracing,Tracing 是目前能提供多维度监控分析能力的最有效方式。
Tracing
链路追踪 Tracing Analysis 为分布式应用的开发者提供了完整的调用链路还原、调用请求量统计、链路拓扑、应用依赖分析等工具,可以帮助开发者快速分析和诊断分布式应用架构下的性能瓶颈,提高微服务时代下的开发诊断效率。
Tracing 可以串联微服务中分布式请求的调用链路,在微服务监控体系中有着重要的作用。另外,Tracing 介于 Metrics 和 Logging 之间,既可以完成 Monitoring 的工作,也可以进行 Observability 的分析,提升监控体系建设效率。
系统模型 链路追踪(Tracing)系统,需要记录一次特定请求经过的上下游服务调用链路,以及各服务所完成的相关工作信息。
如下图所示的微服务系统,用户向服务 A 发起一个请求,服务 A 会生成一个全局唯一的 Trace ID,服务 A 内部 Messaging 方式调用相关处理模块(比如跨线程异步调用等),服务 A 模块再通过 RPC 方式并行调用服务 B 和服务 C;服务 B 会即刻返回响应,但服务 C 会采用串行方式,先用 RPC 调用服务 D,再用 RPC 调用服务 E,然后再响应服务 A 的调用请求;服务 A 在内部两个模块调用处理完后,会响应最初的用户请求。
最开始生成的 Trace ID 会在这一系列的服务内部或服务之间的请求调用中传递,从而将这些请求调用连接起来。另外,Tracing 系统还会记录每一个请求调用处理的 Timestamp、服务名等等相关信息。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 3(注:服务内部串行调用对系统性能有影响,一般采用并行调用方式,后续章节将只考虑并行调用场景。)
在 Tracing 系统中,主要包含 Trace 和 Span 两个基础概念,下图展示了一个由 Span 构成的 Trace。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 4
Trace 指一个外部请求经过的所有服务的调用链路,可以理解为一个有服务调用组成的树状结构,每条链路都有一个全局唯一的 ID 来标识。
Span 指服务内部或服务之间的一次调用,即 Trace 树中的节点,如下图所示的由 Span 构成的 Trace 树,树中的 Span 节点之间存在父子关系。Span 主要包含 Span名称、Span ID、父 ID,以及 Timestamp、Dration(包含子节点调用处理的 duration)、业务数据等其他 log 信息。
Span 根据调用方式可以分为 RPC Span 和 Messaging Span:
RPC Span
由 RPC Tracing 生成,分为 Client 和 Server 两类 Span,分别由 RPC 服务调用的 Client 节点和 Server 节点记录生成,两者共享 Span ID、Parent Span ID 等信息,但要注意,这两个 Span 记录的时间是有偏差,这个偏差是服务间的调用开销,一般是由网络传输开销、代理服务或服务接口消息排队等情况引起的。
Messaging Span
由 Messaging Tracing 生成,一般用于 Tracing 服务内部调用,不同于 RPC Span,Messaging Span 之间不会共享 Span ID 等信息。
应用场景 根据 Tracing 的系统模型,可获得服务响应等各类 Metric 信息,用于 Alerting、DashBoard 查询等;也可根据 Span 组成的链路,分析单个或整体服务情况,发现服务性能瓶颈、网络传输开销、服务内异步调用设计等各种问题。如下图所示,相比于 Metrics 和 Logging,Tracing 可以同时涵盖监控的 Monitoring 和 Observability 场景,在监控体系中占据重要位置,Opentracing、Opencensus、Opentelemetry 等协会和组织都包含对 Tracing 的支持。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 5
从微服务的角度,Tracing 记录的 Span 信息可以进行各种维度的统计和分析。下图基于 HTTP API 设计的微服务系统为例,用户查询 Service1的 /1/api 接口,Service1 再请求 Service2 的 /2/api,Service2 内部异步并发调用 msg2.1 和 msg2.2,msg2.1 请求 Service3的 /3/api接口,msg2.2 请求 Service4 的 /4/api接口,Service3 内部调用 msg3,Service4 再请求 Service5 的 /5/api,其中 Service5 没有进行 Tracing 埋点,无法采集 Service5 的信息。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 6
针对上图的微服务系统,可以进行如下两大类的统计分析操作:
服务内分析
关注单个服务运行情况,比如对外服务接口和上游接口查询的性能指标等,分析场景主要有:
1、上游服务请求
如 Service1 提供的 /1/api ,Service4 提供的 /4/api等,统计获得次数、QPS、耗时百分位数、出错率、超时率等等 metric 信息。
2、下游服务响应
如 Service1 请求的 /2/api 、Service4 请求的 /5/api等,统计查询次数、QPS、耗时百分位数、出错率、超时率等等 Metric 信息。
3、服务内部处理
服务对外接口在内部可能会被分拆为多个 Span,可以按照 Span Name 进行分组聚合统计,发现耗时最长的 Span 等,如 Service2 接口 /2/api ,接口服务内部 Span 包括 /2/api 的 Server Span,call2.1 对应的 Span 和 call2.2 对应的 Span,通过 Span 之间的依赖关系可以算出这些 Span 自身的耗时 Duraion,进行各类统计分析。
服务间分析
在进行微服务整体分析时,我们将单个服务看作黑盒,关注服务间的依赖、调用链路上的服务热点等,分析场景主要有:
1、服务拓扑统计
可以根据服务间调用的 Client Span 和 Server Span,获得整个服务系统的拓扑结构,以及服务之间调用请求次数、Duration 等统计信息。
2、调用链路性能瓶颈分析
分析某个对外请求接口的调用链路上的性能瓶颈,这个瓶颈可能是某个服务内部处理开销造成的,也可能是某两个服务间的网络调用开销等等原因造成的。
对于一次调用涉及到数十个以上微服务的复杂调用请求,每次出现的性能瓶颈很可能都会不一样,此时就需要进行聚合统计,算出性能瓶颈出现频次的排名,分析出针对性能瓶颈热点的服务或服务间调用。
以上仅仅是列举的部分分析场景,Tracing 提供的信息其实可以支持更多的 Metric 统计和探索式分析场景,本文不再一一例举。
02 基于 Zipkin 和 StarRocks 构建链路追踪分析系统 链路追踪系统主要分为数据采集、数据存储和分析计算三大部分,目前使用最广泛的开源链路追踪系统是 Zipkin,它主要包括数据采集和分析计算两大部分,底层的存储依赖其他存储系统。搜狐智能媒体在构建链路追踪系统时,最初采用 Zipkin + ElasticSearch 得方式进行构建,后增加 StarRocks 作为底层存储系统,并基于 StarRocks 进行分析统计,系统总体架构如下图。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 7
数据采集
Zipkin 支持客户端全自动埋点,只需将相关库引入应用程序中并简单配置,就可以实现 Span 信息自动生成,Span 信息通过 HTTP 或 Kafka 等方式自动进行上传。Zipkin 目前提供了绝大部分语言的埋点采集库,如 Java 语言的 Spring Cloud 提供了 Sleuth 与 Zipkin 进行深度绑定,对开发人员基本做到透明使用。为了解决存储空间,在使用时一般要设置 1/100 左右的采样率,Dapper 的论文中提到即便是 1/1000 的采样率,对于跟踪数据的通用使用层面上,也可以提供足够多的信息。
数据模型 对应 图 6,下面给出了 Zipkin Span 埋点采集示意图 (图 8),具体流程如下:
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 8
  1. 用户发送给 Service1 的 Request 中,不含有 Trace 和 Span 信息,Service1 会创建一个 Server Span,随机生成全局唯一的 TraceID(如图中的 X)和 SpanId(如图中的 A,此处的 X 和 A 会使用相同的值),记录 Timestamp 等信息;Service1 在给用户返回 Response 时,Service1 会统计 Server Span 的处理耗时 Duration,会将包含 TraceID、SpanID、Timestamp、Duration 等信息的 Server Span 完整信息进行上报。
  2. Service1 向 Service2 发送的请求,会创建一个 Client Span,使用 X 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 B),记录 Timestamp 等信息,同时 Service1 会将 Trace ID(X)和 SpanID(B)传递给 Service2(如在 HTTP 协议的 HEADER 中添加 TraceID 和 SpanID 等相关字段);Service1 在收到 Service2 的响应后,Service1 会处理 Client Span 相关信息,并将 Client Span 进行上报
  3. Service2 收到 Service1 的 Request 中,包含 Trace(X)和 Span(B)等信息,Service2 会创建一个 Server Span,使用 X 作为 Trace ID,B 作为 SpanID,内部调用msg2.1 和 msg2.2 同时,将 Trace ID(X)和 SpanID(B)传递给它们;Service2 在收到 msg2.1 和 msg2.2 的返回后,Service1 会处理 Server Span 相关信息,并将此 Server Span 进行上报
  4. Service2 的 msg2.1 和 msg2.2 会分别创建一个 Messaging Span,使用 X 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 C 和 F),记录 Timestamp 等信息,分别向 Service3 和 Service4 发送请求;msg2.1 和 msg2.2 收到响应后,会分别处理 Messaging Span 相关信息,并将两个 Messaging Span 进行上报
  5. Service2 向 Service3 和 Service4 发送的请求,会各创建一个 Client Span,使用 X 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 D 和 G),记录 Timestamp 等信息,同时 Service2 会将 Trace ID(X)和 SpanID(D 或 G)传递给 Service3 和 Service4;Service12 在收到 Service3 和 Service3 的响应后,Service2 会分别处理 Client Span 相关信息,并将两个 Client Span 进行上报
  6. Service3 收到 Service2 的Request中,包含 Trace(X)和Span(D)等信息,Service3 会创建一个 Server Span,使用 X 作为 Trace ID,D 作为 SpanID,内部调用 msg3;Service3 在收到 msg3 的返回后,Service3 会处理此 Server Span 相关信息,并将此 Server Span 进行上报
  7. Service3 的 msg3 会分别创建一个 Messaging Span,使用 X 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 E),记录 Timestamp 等信息,msg3 处理完成后,处理此 Messaging Span 相关信息,并将此 Messaging Span 进行上报
  8. Service4 收到 Service2 的 Request 中,包含 Trace(X)和 Span(G)等信息,Service4 会创建一个 Server Span,使用 X 作为 Trace ID,G 作为 SpanID,再向 Service5 发送请求;Service4 在收到 Service5 的响应后,Service4 会处理此 Server Span 相关信息,并将此 Server Span 进行上报
  9. Service4 向 Service5 发送的请求,会创建一个 Client Span,使用 X 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 H),记录 Timestamp 等信息,同时 Service4 会将 Trace ID(X)和 SpanID(H)传递给 Service5;Service4 在收到 Service5 的响应后,Service4 会处理 Client Span 相关信息,并将此 Client Span 进行上报
上面整个 Trace X 调用链路会生成的 Span 记录如下图,每个 Span 主要会记录 Span Id、Parent Id、Kind(CLIENT 表示 RPC CLIENT 端 Span,SERVER 表示 RPC SERVER 端 SPAN,NULL 表示 Messaging SPAN),SN(Service Name),还会包含 Trace ID,时间戳、Duration 等信息。Service5 没有进行 Zipkin 埋点采集,因此不会有 Service5 的 Span 记录。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 9
数据格式 设置了 Zipkin 埋点的应用服务,默认会使用 Json 格式向 Kafka 上报 Span 信息,上报的信息主要有如下几个注意点:
每个应用服务每次会上报一组 Span,组成一个 Json 数组上报
Json 数组里包含不同 Trace的Span,即不是所有的 Trace ID都 相同
不同形式的接口(如 Http、Grpc、Dubbo 等),除了主要字段相同外,在 tags 中会各自记录一些不同的字段
[ { "traceId": "3112dd04c3112036", "id": "3112dd04c3112036", "kind": "SERVER", "name": "get /2/api", "timestamp": 1618480662355011, "duration": 12769, "localEndpoint": { "serviceName": "SERVICE2", "ipv4": "172.24.132.32" }, "remoteEndpoint": { "ipv4": "111.25.140.166", "port": 50214 }, "tags": { "http.method": "GET", "http.path": "/2/api", "mvc.controller.class": "Controller", "mvc.controller.method": "get2Api" } }, { "traceId": "3112dd04c3112036", "parentId": "3112dd04c3112036", "id": "b4bd9859c690160a", "name": "msg2.1", "timestamp": 1618480662357211, "duration": 11069, "localEndpoint": { "serviceName": "SERVICE2" }, "tags": { "class": "MSG", "method": "msg2.1" } }, { "traceId": "3112dd04c3112036", "parentId": "3112dd04c3112036", "id": "c31d9859c69a2b21", "name": "msg2.2", "timestamp": 1618480662357201, "duration": 10768, "localEndpoint": { "serviceName": "SERVICE2" }, "tags": { "class": "MSG", "method": "msg2.2" } }, { "traceId": "3112dd04c3112036", "parentId": "b4bd9859c690160a", "id": "f1659c981c0f4744", "kind": "CLIENT", "name": "get /3/api", "timestamp": 1618480662358201, "duration": 9206, "localEndpoint": { "serviceName": "SERVICE2", "ipv4": "172.24.132.32" }, "tags": { "http.method": "GET", "http.path": "/3/api" } }, { "traceId": "3112dd04c3112036", "parentId": "c31d9859c69a2b21", "id": "73cd1cab1d72a971", "kind": "CLIENT", "name": "get /4/api", "timestamp": 1618480662358211, "duration": 9349, "localEndpoint": { "serviceName": "SERVICE2", "ipv4": "172.24.132.32" }, "tags": { "http.method": "GET", "http.path": "/4/api" } } ]

万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 10
数据存储
Zipkin 支持 MySQL、Cassandra 和 ElasticSearch 三种数据存储,这三者都存在各自的缺点:
  • MySQL:采集的 Tracing 信息基本都在每天上亿行甚至百亿行以上,MySQL 无法支撑这么大数据量。
  • Cassandra:能支持对单个 Trace 的 Span 信息分析,但对聚合查询等数据统计分析场景支持不好
  • ElasticSearch:能支持单个 Trace 的分析和简单的聚合查询分析,但对于一些较复杂的数据分析计算不能很好的支持,比如涉及到 Join、窗口函数等等的计算需求,尤其是任务间依赖计算,Zipkin 目前还不能实时计算,需要通过离线跑 Spark 任务计算任务间依赖信息。
我们在实践中也是首先使用 ElasticSearch,发现了上面提到的问题,比如 Zipkin 的服务依赖拓扑必须使用离线方式计算,便新增了 StarRocks 作为底层数据存储。将 Zipkin 的 trace 数据导入到StarRocks很方便,基本步骤只需要两步,CREATE TABLE + CREATE ROUTINE LOAD。
另外,在调用链路性能瓶颈分析场景中,要将单个服务看作黑盒,只关注 RPC SPAN,屏蔽掉服务内部的 Messaging Span,使用了 Flink 对服务内部 span 进行 ParentID 溯源,即从 RPC Client SPAN,一直追溯到同一服务同一 Trace ID 的 RPC Server SPAN,用 RPC Server SPAN 的 ID 替换 RPC Client SPAN 的parentId,最后通过Flink-Connector-StarRocks将转换后的数据实时写入StarRocks。
基于 StarRocks 的数据存储架构流程如下图所示。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 11
CREATE TABLE 建表语句示例参考如下,有如下几点注意点:
  • 包括 Zipkin 和 zipkin_trace_perf 两张表,zipkin_trace_perf 表只用于调用链路性能瓶颈分析场景,其他统计分析都适用 Zipkin 表
  • 通过采集信息中的 Timestamp 字段,生成 dt、hr、min 时间字段,便于后续统计分析
  • 采用 DUPLICATE 模型、Bitmap 索引等设置,加快查询速度
  • Zipkin 表使用id作为分桶字段,在查询服务拓扑时,查询计划会优化为 Colocate Join,提升查询性能。
Zipkin
CREATE TABLE `zipkin` ( `traceId` varchar(24) NULL COMMENT "", `id` varchar(24) NULL COMMENT "Span ID", `localEndpoint_serviceName` varchar(512) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `parentId` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `hr` int(11) NULL COMMENT "", `min` bigint(20) NULL COMMENT "", `kind` varchar(16) NULL COMMENT "", `duration` int(11) NULL COMMENT "", `name` varchar(300) NULL COMMENT "", `localEndpoint_ipv4` varchar(16) NULL COMMENT "", `remoteEndpoint_ipv4` varchar(16) NULL COMMENT "", `remoteEndpoint_port` varchar(16) NULL COMMENT "", `shared` int(11) NULL COMMENT "", `tag_error` int(11) NULL DEFAULT "0" COMMENT "", `error_msg` varchar(1024) NULL COMMENT "", `tags_http_path` varchar(2048) NULL COMMENT "", `tags_http_method` varchar(1024) NULL COMMENT "", `tags_controller_class` varchar(100) NULL COMMENT "", `tags_controller_method` varchar(1024) NULL COMMENT "", INDEX service_name_idx (`localEndpoint_serviceName`) USING BITMAP COMMENT '' ) ENGINE=OLAP DUPLICATE KEY(`traceId`, `parentId`, `id`, `timestamp`, `localEndpoint_serviceName`, `dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) (PARTITION p20220104 VALUES [("20220104"), ("20220105")), PARTITION p20220105 VALUES [("20220105"), ("20220106"))) DISTRIBUTED BY HASH(`id`) BUCKETS 100 PROPERTIES ( "replication_num" = "3", "dynamic_partition.enable" = "true", "dynamic_partition.time_unit" = "DAY", "dynamic_partition.time_zone" = "Asia/Shanghai", "dynamic_partition.start" = "-30", "dynamic_partition.end" = "2", "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "100", "in_memory" = "false", "storage_format" = "DEFAULT" );

zipkin_trace_perf
CREATE TABLE `zipkin_trace_perf` ( `traceId` varchar(24) NULL COMMENT "", `id` varchar(24) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `parentId` varchar(24) NULL COMMENT "", `localEndpoint_serviceName` varchar(512) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `hr` int(11) NULL COMMENT "", `min` bigint(20) NULL COMMENT "", `kind` varchar(16) NULL COMMENT "", `duration` int(11) NULL COMMENT "", `name` varchar(300) NULL COMMENT "", `tag_error` int(11) NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`traceId`, `id`, `dt`, `parentId`, `localEndpoint_serviceName`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) (PARTITION p20220104 VALUES [("20220104"), ("20220105")), PARTITION p20220105 VALUES [("20220105"), ("20220106"))) DISTRIBUTED BY HASH(`traceId`) BUCKETS 32 PROPERTIES ( "replication_num" = "3", "dynamic_partition.enable" = "true", "dynamic_partition.time_unit" = "DAY", "dynamic_partition.time_zone" = "Asia/Shanghai", "dynamic_partition.start" = "-60", "dynamic_partition.end" = "2", "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "12", "in_memory" = "false", "storage_format" = "DEFAULT" );

ROUTINE LOAD ROUTINE LOAD 创建语句示例如下:
CREATE ROUTINE LOAD zipkin_routine_load ON zipkin COLUMNS( id, kind, localEndpoint_serviceName, traceId, `name`, `timestamp`, `duration`, `localEndpoint_ipv4`, `remoteEndpoint_ipv4`, `remoteEndpoint_port`, `shared`, `parentId`, `tags_http_path`, `tags_http_method`, `tags_controller_class`, `tags_controller_method`, tmp_tag_error, tag_error = if(`tmp_tag_error` IS NULL, 0, 1), error_msg = tmp_tag_error, dt = from_unixtime(`timestamp` / 1000000, '%Y%m%d'), hr = from_unixtime(`timestamp` / 1000000, '%H'), `min` = from_unixtime(`timestamp` / 1000000, '%i') ) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "50", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "max_error_number" = "1000000", "strict_mode" = "false", "format" = "json", "strip_outer_array" = "true", "jsonpaths" = "[\"$.id\",\"$.kind\",\"$.localEndpoint.serviceName\",\"$.traceId\",\"$.name\",\"$.timestamp\",\"$.duration\",\"$.localEndpoint.ipv4\",\"$.remoteEndpoint.ipv4\",\"$.remoteEndpoint.port\",\"$.shared\",\"$.parentId\",\"$.tags.\\\"http.path\\\"\",\"$.tags.\\\"http.method\\\"\",\"$.tags.\\\"mvc.controller.class\\\"\",\"$.tags.\\\"mvc.controller.method\\\"\",\"$.tags.error\"]" ) FROM KAFKA ( "kafka_broker_list" = "IP1:PORT1,IP2:PORT2,IP3:PORT3", "kafka_topic" = "XXXXXXXXX" );

Flink 溯源 Parent ID 针对调用链路性能瓶颈分析场景中,使用 Flink 进行 Parent ID 溯源,代码示例如下:
env // 添加kafka数据源 .addSource(getKafkaSource()) // 将采集到的Json字符串转换为JSONArray, // 这个JSONArray是从单个服务采集的信息,里面会包含多个Trace的Span信息 .map(JSON.parseArray(_)) // 将JSONArray转换为JSONObject,每个JSONObejct就是一个Span .flatMap(_.asScala.map(_.asInstanceOf[JSONObject])) // 将Span的JSONObject对象转换为Bean对象 .map(jsonToBean(_)) // 以traceID+localEndpoint_serviceName作为key对span进行分区生成keyed stream .keyBy(span => keyOfTrace(span)) // 使用会话窗口,将同一个Trace的不同服务上的所有Span,分发到同一个固定间隔的processing-time窗口 // 这里为了实现简单,使用了processing-time session窗口,后续我们会使用starrocks的UDAF函数进行优化,去掉对Flink的依赖 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) // 使用Aggregate窗口函数 .aggregate(new TraceAggregateFunction) // 将经过溯源的span集合展开,便于调用flink-connector-starrocks .flatMap(spans => spans) // 使用flink-connector-starrocks sink,将数据写入starrocks中 .addSink( StarRocksSink.sink( StarRocksSinkOptions.builder().withProperty("XXX", "XXX").build()))

分析计算
以 图 6 作为一个微服务系统用例,给出各个统计分析场景对应的 StarRocks SQL 语句。
服务内分析 上游服务请求指标统计
下面的 SQL 使用 Zipkin 表数据,计算服务 Service2 请求上游服务 Service3 和上游服务 Service4 的查询统计信息,按小时和接口分组统计查询指标
select hr, name, req_count, timeout / req_count * 100 as timeout_rate, error_count / req_count * 100 as error_rate, avg_duration, tp95, tp99 from ( select hr, name, count(1) as req_count, AVG(duration) / 1000 as avg_duration, sum(if(duration > 200000, 1, 0)) as timeout, sum(tag_error) as error_count, percentile_approx(duration, 0.95) / 1000 AS tp95, percentile_approx(duration, 0.99) / 1000 AS tp99 from zipkin where localEndpoint_serviceName = 'Service2' and kind = 'CLIENT' and dt = 20220105 group by hr, name ) tmp order by hr

下游服务响应指标统计
下面的 SQL 使用 Zipkin 表数据,计算服务 Service2 响应下游服务 Service1 的查询统计信息,按小时和接口分组统计查询指标。
select hr, name, req_count, timeout / req_count * 100 as timeout_rate, error_count / req_count * 100 as error_rate, avg_duration, tp95, tp99 from ( select hr, name, count(1) as req_count, AVG(duration) / 1000 as avg_duration, sum(if(duration > 200000, 1, 0)) as timeout, sum(tag_error) as error_count, percentile_approx(duration, 0.95) / 1000 AS tp95, percentile_approx(duration, 0.99) / 1000 AS tp99 from zipkin where localEndpoint_serviceName = 'Service2' and kind = 'SERVER' and dt = 20220105 group by hr, name ) tmp order by hr

服务内部处理分析
下面的 SQL 使用 Zipkin 表数据,查询服务 Service2 的接口 /2/api,按 Span Name 分组统计 Duration 等信息。
with spans as ( select * from zipkin where dt = 20220105 and localEndpoint_serviceName = "Service2" ), api_spans as ( select spans.id as id, spans.parentId as parentId, spans.name as name, spans.duration as duration from spans inner JOIN (select * from spans where kind = "SERVER" and name = "/2/api") tmp on spans.traceId = tmp.traceId ) SELECT name, AVG(inner_duration) / 1000 as avg_duration, percentile_approx(inner_duration, 0.95) / 1000 AS tp95, percentile_approx(inner_duration, 0.99) / 1000 AS tp99 from ( select l.name as name, (l.duration - ifnull(r.duration, 0)) as inner_duration from api_spans l left JOIN api_spans r on l.parentId = r.id ) tmp GROUP BY name

服务间分析 服务拓扑统计
下面的 SQL 使用 Zipkin 表数据,计算服务间的拓扑关系,以及服务间接口 Duration 的统计信息。
with tbl as (select * from zipkin where dt = 20220105) select client, server, name, AVG(duration) / 1000 as avg_duration, percentile_approx(duration, 0.95) / 1000 AS tp95, percentile_approx(duration, 0.99) / 1000 AS tp99 from ( select c.localEndpoint_serviceName as client, s.localEndpoint_serviceName as server, c.name as name, c.duration as duration from (select * from tbl where kind = "CLIENT") c left JOIN (select * from tbl where kind = "SERVER") s on c.id = s.id and c.traceId = s.traceId ) as tmp group by client, server, name

调用链路性能瓶颈分析
下面的 SQL 使用 zipkin_trace_perf 表数据,针对某个服务接口响应超时的查询请求,统计出每次请求的调用链路中处理耗时最长的服务或服务间调用,进而分析出性能热点是在某个服务或服务间调用。
select service, ROUND(count(1) * 100 / sum(count(1)) over(), 2) as percent from ( select traceId, service, duration, ROW_NUMBER() over(partition by traceId order by duration desc) as rank4 from ( with tbl as ( SELECT l.traceId as traceId, l.id as id, l.parentId as parentId, l.kind as kind, l.duration as duration, l.localEndpoint_serviceName as localEndpoint_serviceName FROM zipkin_trace_perf l INNER JOIN zipkin_trace_perf r on l.traceId = r.traceId and l.dt = 20220105 and r.dt = 20220105 and r.tag_error = 0-- 过滤掉出错的trace and r.localEndpoint_serviceName = "Service1" and r.name = "/1/api" and r.kind = "SERVER" and r.duration > 200000-- 过滤掉未超时的trace ) select traceId, id, service, duration from ( select traceId, id, service, (c_duration - s_duration) as duration, ROW_NUMBER() over(partition by traceId order by (c_duration - s_duration) desc) as rank2 from ( select c.traceId as traceId, c.id as id, concat(c.localEndpoint_serviceName, "=>", ifnull(s.localEndpoint_serviceName, "?")) as service, c.duration as c_duration, ifnull(s.duration, 0) as s_duration from (select * from tbl where kind = "CLIENT") c left JOIN (select * from tbl where kind = "SERVER") s on c.id = s.id and c.traceId = s.traceId ) tmp1 ) tmp2 where rank2 = 1 union ALL select traceId, id, service, duration from ( select traceId, id, service, (s_duration - c_duration) as duration, ROW_NUMBER() over(partition by traceId order by (s_duration - c_duration) desc) as rank2 from ( select s.traceId as traceId, s.id as id, s.localEndpoint_serviceName as service, s.duration as s_duration, ifnull(c.duration, 0) as c_duration, ROW_NUMBER() over(partition by s.traceId, s.id order by ifnull(c.duration, 0) desc) as rank from (select * from tbl where kind = "SERVER") s left JOIN (select * from tbl where kind = "CLIENT") c on s.id = c.parentId and s.traceId = c.traceId ) tmp1 where rank = 1 ) tmp2 where rank2 = 1 ) tmp3 ) tmp4 where rank4 = 1 GROUP BY service order by percent desc

SQL 查询的结果如下图所示,在超时的 Trace 请求中,性能瓶颈服务或服务间调用的比例分布。
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 12
03 实践效果 目前搜狐智能媒体已在 30+ 个服务中接入 Zipkin,涵盖上百个线上服务实例,1% 的采样率每天产生近 10亿 多行的日志。
通过 Zipkin Server 查询 StarRocks,获取的 Trace 信息如下图所示:
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 13
通过 Zipkin Server 查询 StarRocks,获取的服务拓扑信息如下图所示:
万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践
文章图片

图 14
基于 Zipkin StarRocks 的链路追踪体系实践过程中,明显提升了微服务监控分析能力和工程效率:
提升微服务监控分析能力
  • 在监控报警方面,可以基于 StarRocks 查询统计线上服务当前时刻的响应延迟百分位数、错误率等指标,根据这些指标及时产生各类告警;
  • 在指标统计方面,可以基于 StarRocks 按天、小时、分钟等粒度统计服务响应延迟的各项指标,更好的了解服务运行状况;
  • 在故障分析方面,基于 StarRocks 强大的 SQL 计算能力,可以进行服务、时间、接口等多个维度的探索式分析查询,定位故障原因。
提升微服务监控工程效率
Metric 和 Logging 数据采集,很多需要用户手动埋点和安装各种采集器 Agent,数据采集后存储到 ElasticSearch 等存储系统,每上一个业务,这些流程都要操作一遍,非常繁琐,且资源分散不易管理。
而使用 Zipkin + StarRocks 的方式,只需在代码中引入对应库 SDK,设置上报的 Kafka 地址和采样率等少量配置信息,Tracing 便可自动埋点采集,通过 zikpin server 界面进行查询分析,非常简便。
04 总结与展望 【万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践】基于 Zipkin+StarRocks 构建链路追踪系统,能够提供微服务监控的 Monitoring 和 Observability 能力,提升微服务监控的分析能力和工程效率。
后续有几个优化点,可以进一步提升链路追踪系统的分析能力和易用性:
  1. 使用 StarRocks 的 UDAF、窗口函数等功能,将 Parent ID 溯源下沉到 StarRocks计算,通过计算后置的方式,取消对 Flink 的依赖,进一步简化整个系统架构。
  2. 目前对原始日志中的 tag s等字段,并没有完全采集,StarRocks 正在实现 Json 数据类型,能够更好的支持 tags 等嵌套数据类型。
  3. Zipkin Server 目前的界面还稍显简陋,我们已经打通了 Zipkin Server 查询 StarRokcs,后续会对 Zipkin Server 进行 U I等优化,通过 StarRocks 强大的计算能力实现更多的指标查询,进一步提升用户体验。
05 参考文档
  1. 《云原生计算重塑企业IT架构 - 分布式应用架构》:
    https://developer.aliyun.com/article/717072
  2. What is Upstream and Downstream in Software Development?
    https://reflectoring.io/upstream-downstream/
  3. Metrics, tracing, and logging:
    https://peter.bourgon.org/blog/2017/02/21/metrics-tracing-and-logging.html
  4. The 3 pillars of system observability:logs, metrics and tracing:
    https://iamondemand.com/blog/the-3-pillars-of-system-observability-logs-metrics-and-tracing/
  5. observability 3 ways: logging, metrics and tracing:
    https://speakerdeck.com/adriancole/observability-3-ways-logging-metrics-and-tracing
  6. Dapper, a Large-Scale Distributed Systems Tracing Infrastructure:
    https://static.googleusercontent.com/media/research.google.com/en//archive/papers/dapper-2010-1.pdf
  7. Jaeger:www.jaegertracing.io
  8. Zipkin:https://zipkin.io/
  9. opentracing.io:
    https://opentracing.io/docs/
  10. opencensus.io:
    https://opencensus.io/
  11. opentelemetry.io:
    https://opentelemetry.io/docs/
  12. Microservice Observability, Part 1: Disambiguating Observability and Monitoring:
    https://bravenewgeek.com/microservice-observability-part-1-disambiguating-observability-and-monitoring/
  13. How to Build Observable Distributed Systems:
    https://www.infoq.com/presentations/observable-distributed-ststems/
  14. Monitoring and Observability:
    https://copyconstruct.medium.com/monitoring-and-observability-8417d1952e1c
  15. Monitoring Isn't Observability:
    https://orangematter.solarwinds.com/2017/09/14/monitoring-isnt-observability/
  16. Spring Cloud Sleuth Documentation:
    https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/getting-started.html#getting-started

    推荐阅读