丈夫志四海,万里犹比邻。这篇文章主要讲述Spring Cloud Gateway 没有链路信息,我 TM 人傻了(中)相关的知识,希望能为你提供帮助。
文章图片
本篇文章涉及底层设计以及原理,以及问题定位和可能的问题点,非常深入,篇幅较长,所以拆分成上中下三篇:
- 上:问题简单描述以及 Spring Cloud Gateway 基本结构和流程以及底层原理
- 中:Spring Cloud Sleuth 如何在 Spring Cloud Gateway 加入的链路追踪以及为何会出现这个问题
- 下:现有 Spring Cloud Sleuth 的非侵入设计带来的性能问题,其他可能的问题点,以及如何解决
@Override
public void subscribe(CoreSubscriber<
? super Void>
subscriber) {
Context context = contextWithoutInitialSpan(subscriber.currentContext());
Span span = findOrCreateSpan(context);
//将 Span 放入执行上下文中,对于日志其实就是将链路信息放入 org.slf4j.MDC
//日志的 MDC 一般都是 ThreadLocal 的 Map,对于 Log4j2 的实现类就是 org.apache.logging.log4j.ThreadContext,其核心 contextMap 就是一个基于 ThreadLocal 实现的 Map
//简单理解就是将链路信息放入一个 ThreadLocal 的 Map 中,每个线程访问自己的 Map 获取链路信息
try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) {
//将实际的 subscribe 用 Span 所在的 Context 包裹住,结束时关闭 Span
this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this));
}
//在 scope.close() 之后,会将链路信息从ThreadLocal 的 Map 中剔除
}@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) {
//执行的方式必须是不能切换线程,也就是同步的
//因为,日志的链路信息是放在 ThreadLocal 对象中,切换线程,链路信息就没了
return Attr.RunStyle.SYNC;
}
return super.scanUnsafe(key);
}
WebFilterTraceSubscriber 干了些什么呢?出现异常,以及 http 请求结束的时候,我们可能想将响应信息,异常信息记录进入 Span 中,就是通过这个类封装实现的。
经过 MonoWebFilterTrace 的封装,由于 Spring-WebFlux 处理请求,其实就是封装成我们上面得出的 Mono 之后进行 subscribe 处理的请求,所以这样,整个内部 Mono 的 publish 链路以及 subscribe 链路,就被 WebFilterTraceSubscriber 中的 scope 包裹起来了。只要我们自己不在 GatewayFilter 中转换成某些强制异步的 Mono 或者 Flux 导致切换线程,链路信息是不会丢失的。
我们应用中丢失链路信息的地方通过查看日志我们发现,启用 RequestBody 缓存的地方,都有链路缺失。这个 RequestBody 缓存我们使用的是 Spring Cloud Gateway 中的
AdaptCachedBodyGlobalFilter
,其核心源码是:private static <
T>
Mono<
T>
cacheRequestBody(ServerWebExchange exchange, boolean cacheDecoratedRequest,
Function<
ServerHttpRequest, Mono<
T>
>
function) {
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
return
//读取 Body,由于 TCP 拆包,所以需要他们拼接到一起
DataBufferUtils.join(exchange.getRequest().getBody())
//如果没有 Body,则直接返回空 DataBuffer
.defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))
//decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body
//之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路
.map(dataBuffer ->
decorate(exchange, dataBuffer, cacheDecoratedRequest))
.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);
}
为何会使用这个
AdaptCachedBodyGlobalFilter
呢?获取请求 Body 是通过 exchange.getRequest().getBody()
获取的,其结果是一个 Flux&
lt;
DataBuffer&
gt;
.请求的 Body 是一次性的,如果你需要请求重试的话,在第一次调用失败的之后,第二次重试的时候,Body 就读取不到了,因为 Flux 已经结束。所以,对于需要重复调用,例如重试,一对多路由转发的情况,需要将请求 Body 缓存起来,就是经过这个 GatewayFilter。但是经过这个 GatewayFilter 之后,链路信息就没了,可以通过以下这个简单项目进行复现(项目地址):引入依赖:
<
parent>
<
groupId>
org.springframework.boot<
/groupId>
<
artifactId>
spring-boot-starter-parent<
/artifactId>
<
version>
2.4.6<
/version>
<
/parent>
<
dependencies>
<
dependency>
<
groupId>
org.springframework.cloud<
/groupId>
<
artifactId>
spring-cloud-starter-gateway<
/artifactId>
<
/dependency>
<
dependency>
<
groupId>
org.springframework.cloud<
/groupId>
<
artifactId>
spring-cloud-starter-sleuth<
/artifactId>
<
/dependency>
<
dependency>
<
groupId>
org.springframework.boot<
/groupId>
<
artifactId>
spring-boot-starter-log4j2<
/artifactId>
<
/dependency>
<
!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置-->
<
dependency>
<
groupId>
com.lmax<
/groupId>
<
artifactId>
disruptor<
/artifactId>
<
version>
${disruptor.version}<
/version>
<
/dependency>
<
/dependencies>
<
dependencyManagement>
<
dependencies>
<
dependency>
<
groupId>
org.springframework.cloud<
/groupId>
<
artifactId>
spring-cloud-dependencies<
/artifactId>
<
version>
2020.0.3<
/version>
<
type>
pom<
/type>
<
scope>
import<
/scope>
<
/dependency>
<
/dependencies>
<
/dependencyManagement>
对所有路径开启
AdaptCachedBodyGlobalFilter
:@Configuration(proxyBeanMethods = false)
public class ApiGatewayConfiguration {
@Autowired
private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter;
@Autowired
private GatewayProperties gatewayProperties;
@PostConstruct
public void init() {
gatewayProperties.getRoutes().forEach(routeDefinition ->
{
//对 spring cloud gateway 路由配置中的每个路由都启用 AdaptCachedBodyGlobalFilter
EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId());
adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent);
});
}
}
配置(我们只有一个路由,将请求转发到 httpbin.org 这个 http 请求测试网站):
server:
port: 8181
spring:
application:
name: apiGateway
cloud:
gateway:
httpclient:
connect-timeout: 500
response-timeout: 60000
routes:
- id: first_route
uri: http://httpbin.org
predicates:
- Path=/httpbin/**
filters:
- StripPrefix=1
添加两个全局 Filter,一个在
AdaptCachedBodyGlobalFilter
之前,一个在 AdaptCachedBodyGlobalFilter
之后。这两个 Filter 非常简单,只是打一行日志。@Log4j2
@Component
public class PreLogFilter implements GlobalFilter, Ordered {public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1;
@Override
public Mono<
Void>
filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("before AdaptCachedBodyGlobalFilter");
return chain.filter(exchange);
}@Override
public int getOrder() {
return ORDER;
}
}@Log4j2
@Component
public class PostLogFilter implements GlobalFilter, Ordered {public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1;
@Override
public Mono<
Void>
filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("after AdaptCachedBodyGlobalFilter");
return chain.filter(exchange);
}@Override
public int getOrder() {
return ORDER;
}
}
最后指定 Log4j2 的输出格式中包含链路信息,就像系列文章开头中指定的那样。
启动这个应用,之后访问
http://127.0.0.1:8181/httpbin/anything
,查看日志,发现 PostLogFilter 中的日志,没有链路信息了:2021-09-08 06:32:35.457INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter
2021-09-08 06:32:35.474INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter
为何链路信息会丢失我们来看经过 AdaptCachedBodyGlobalFilter 之后,我们前面拼的 Mono 链路会变成什么样:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function<
Route, Mono<
?>
>
) r ->
{
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
//将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler);
//返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() ->
{ //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(
Mono.defer(() ->
{
//省略在 AdaptCachedBodyGlobalFilter 前面的链路嵌套
//读取 Body,由于 TCP 拆包,所以需要他们拼接到一起
DataBufferUtils.join(exchange.getRequest().getBody())
//如果没有 Body,则直接返回空 DataBuffer
.defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))
//decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body
//之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路
.map(dataBuffer ->
decorate(exchange, dataBuffer, cacheDecoratedRequest))
.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);
})
.then(Mono.empty()))
), //调用对应的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) ->
{
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);
其中
DataBufferUtils.join(exchange.getRequest().getBody())
其实是一个 FluxReceive,这里我们可以理解为:提交一个尝试读取请求 Body 的任务,将之后的 GatewayFilter 的链路处理加到在读取完 Body 之后的回调当中,提交这个任务后,立刻返回。这么看可能比较复杂,我们用一个类似的例子类比下://首先我们创建一个新的 Span
Span span = tracer.newTrace();
//声明一个类似于 TraceWebFilter 中封装的 MonoWebFilterTrace 的 MonoOperator
class MonoWebFilterTrace<
T>
extends MonoOperator<
T, T>
{
protected MonoWebFilterTrace(Mono<
? extends T>
source) {
super(source);
}@Override
public void subscribe(CoreSubscriber<
? super T>
actual) {
//将 subscribe 用 span 包裹
try (Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span)) {
source.subscribe(actual);
//在将要关闭 spanInScope 的时候(即从 ThreadLocal 的 Map 中移除链路信息),打印日志
log.info("stopped");
}
}
}Mono.defer(() ->
new MonoWebFilterTrace(
Mono.fromRunnable(() ->
{
log.info("first");
})
//模拟 FluxReceive
.then(Mono.delay(Duration.ofSeconds(1))
.doOnSuccess(longSignal ->
log.info(longSignal))))
).subscribe(aLong ->
log.info(aLong));
Mono.delay
和 FluxReceive 表现类似,都是异步切换线程池执行。执行上面的代码,我们可以从日志上面就能看出来:2021-09-08 07:12:45.236INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first
2021-09-08 07:12:45.240INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped
2021-09-08 07:12:46.241INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0)
2021-09-08 07:12:46.242INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete()
2021-09-08 07:12:46.242INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0
在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 使用的线程池和调用 GatewayFilter 的是同一个线程池,所以可能线程还是同一个,但是由于 Span 已经结束,从 ThreadLocal 的 Map 中已经移除了链路信息,所以日志中还是没有链路信息。
【Spring Cloud Gateway 没有链路信息,我 TM 人傻了(中)】
文章图片
推荐阅读
- Linux嵌入式所有知识点-思维导图-一口君吐血奉献
- 1分钟配置Kubectl命令自动补全功能
- 十zabbix监控TCP连接状态
- QT数据库访问技术简介
- 十一zabbix主动发现和自动注册
- 如何在WordPress中进行真实页面缓存()
- 如何在不使用短代码的情况下使插件与自定义wp网站一起使用()
- 如何在WordPress中使客户计费电话号码唯一
- 默认情况下如何在WordPress中将菜单添加到菜单位置