努力尽今夕,少年犹可夸。这篇文章主要讲述Spring Cloud Gateway 没有链路信息,我 TM 人傻了(下)相关的知识,希望能为你提供帮助。
文章图片
本篇文章涉及底层设计以及原理,以及问题定位和可能的问题点,非常深入,篇幅较长,所以拆分成上中下三篇:
- 上:问题简单描述以及 Spring Cloud Gateway 基本结构和流程以及底层原理
- 中:Spring Cloud Sleuth 如何在 Spring Cloud Gateway 加入的链路追踪以及为何会出现这个问题
- 下:现有 Spring Cloud Sleuth 的非侵入设计带来的性能问题,其他可能的问题点,以及如何解决
1.在 GatewayFilter 中指定了异步执行某些任务,由于线程切换了,并且这时候可能 Span 已经结束了,所以没有链路信息,例如:
@Override
public Mono<
Void>
filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o ->
{
//这里就没有链路信息了
log.info("success");
});
}
2.将 GatewayFilter 中继续链路的
chain.filter(exchange)
放到了异步任务中执行,上面的 AdaptCachedBodyGlobalFilter 就属于这种情况,这样会导致之后的 GatewayFilter 都没有链路信息,例如:@Override
public Mono<
Void>
filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange));
}
java 并发编程模型与 Project Reactor 编程模型的冲突思考Java 中的很多框架,都用到了 ThreadLocal,或者通过 Thread 来标识唯一性。例如:
- 日志框架中的 MDC,一般都是 ThreadLocal 实现。
- 所有的锁、基于 AQS 的数据结构,都是通过 Thread 的属性来唯一标识谁获取到了锁的。
- 分布式锁等数据结构,也是通过 Thread 的属性来唯一标识谁获取到了锁的,例如 Redisson 中分布式 Redis 锁的实现。
运行不切换线程,这样其实限制了 Project Reactor 的灵活调度,是有一些性能损失的。我们其实想尽量就算加入了链路追踪信息,也不用强制运行不切换线程。但是 Spring Cloud Sleuth 是非侵入式设计,很难实现这一点。但是对于我们自己业务的使用,我们可以定制一些编程规范,来保证大家写的代码不丢失链路信息。
改进我们的编程规范首先,我们自定义 Mono 和 Flux 的工厂
公共 Subscriber 封装,将 reactor Subscriber 的所有关键接口,都检查当前上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则直接执行即可。
public class TracedCoreSubscriber<
T>
implements Subscriber<
T>
{
private final Subscriber<
T>
delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedCoreSubscriber(Subscriber<
T>
delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}@Override
public void onSubscribe(Subscription s) {
executeWithinScope(() ->
{
delegate.onSubscribe(s);
});
}@Override
public void onError(Throwable t) {
executeWithinScope(() ->
{
delegate.onError(t);
});
}@Override
public void onComplete() {
executeWithinScope(() ->
{
delegate.onComplete();
});
}@Override
public void onNext(T o) {
executeWithinScope(() ->
{
delegate.onNext(o);
});
}private void executeWithinScope(Runnable runnable) {
//如果当前没有链路信息,强制包裹
if (tracer.currentSpan() == null) {
try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {
runnable.run();
}
} else {
//如果当前已有链路信息,则直接执行
runnable.run();
}
}
}
之后分别定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:
public class TracedFlux<
T>
extends Flux<
T>
{
private final Flux<
T>
delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedFlux(Flux<
T>
delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}@Override
public void subscribe(CoreSubscriber<
? super T>
actual) {
delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
}public class TracedMono<
T>
extends Mono<
T>
{
private final Mono<
T>
delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedMono(Mono<
T>
delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}@Override
public void subscribe(CoreSubscriber<
? super T>
actual) {
delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
}
定义工厂类,使用请求 ServerWebExchange 和原始 Flux 创建 TracedFlux,以及使用请求 ServerWebExchange 和原始 Mono 创建 TracedMono,并且 Span 是通过 Attributes 获取的,根据前文的源码分析我们知道,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。由于我们只在 GatewayFilter 中使用,一定在 TraceWebFilter 之后 所以这个 Attribute 一定存在。
@Component
public class TracedPublisherFactory {
protected static final String TRACE_REQUEST_ATTR = Span.class.getName();
@Autowired
private Tracer tracer;
@Autowired
private CurrentTraceContext currentTraceContext;
public <
T>
Flux<
T>
getTracedFlux(Flux<
T>
publisher, ServerWebExchange exchange) {
return new TracedFlux<
>
(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
}public <
T>
Mono<
T>
getTracedMono(Mono<
T>
publisher, ServerWebExchange exchange) {
return new TracedMono<
>
(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
}
}
然后,我们规定:1. 所有的 GatewayFilter,需要继承我们自定义的抽象类,这个抽象类仅仅是把 filter 的结果用 TracedPublisherFactory 的 getTracedMono 给封装了一层 TracedMono,以 GlobalFilter 为例子:
public abstract class AbstractTracedFilter implements GlobalFilter {
@Autowired
protected TracedPublisherFactory tracedPublisherFactory;
@Override
public Mono<
Void>
filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange);
}protected abstract Mono<
Void>
traced(ServerWebExchange exchange, GatewayFilterChain chain);
}
2. GatewayFilter 中新生成的 Flux 或者 Mono,统一使用 TracedPublisherFactory 再封装一层。
3. 对于 AdaptCachedBodyGlobalFilter 读取 Request Body 导致的链路丢失,我向社区提了一个 Pull Request: fix #2004 Span is not terminated properly in Spring Cloud Gateway,大家可以参考。也可以在这个 Filter 之前自己将 Request Body 使用 TracedPublisherFactory 进行封装解决。
文章图片
推荐阅读
- 2021中国十大软件公司排名-工业软件上市公司排行榜
- 项管行知07--工作分解
- 三网免挂轮询??码支付??,QQ支付宝微信免挂商业版源码
- JVM垃圾回收机制是怎样的,何时触发YoungGC或FullGC操作
- 如何在WordPress网站上删除自动生成的广告()
- 如何从WordPress中的自定义帖子类型的编辑器页面中删除”添加新”按钮()
- 如何减少WordPress中header和iframe之间的空间()
- 如何减少我网站上搜索栏的长度和宽度( [关闭])
- 如何在wordpress小部件中仅查询一个帖子