读Flink源码谈设计(Metric)
本文首发于 泊浮目的简书: https://www.jianshu.com/u/204...
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2021.10.8 | 文章首发 |
本文的源码基于FLink
1.13.2
。
1. 扩展插件化
在官网中,FLink社区自己提供了一些已接入的Repoter,如果我们有自己定制的Reporter,也可以根据它的规范去实现自己的Repoter。在FLink的代码中,提供了反射机制实例化MetricReporter:要求MetricReporter的实现类必须是
public
的访问修饰符,不能是抽象类,必须有一个无参构造函数。核心代码为
RepoterSetup#getAllReporterFactories
:private static Iterator getAllReporterFactories(
@Nullable PluginManager pluginManager) {
final Iterator factoryIteratorSPI =
ServiceLoader.load(MetricReporterFactory.class).iterator();
final Iterator factoryIteratorPlugins =
pluginManager != null
? pluginManager.load(MetricReporterFactory.class)
: Collections.emptyIterator();
return Iterators.concat(factoryIteratorPlugins, factoryIteratorSPI);
}
【读Flink源码谈设计(Metric)】该代码会通过Java的SPI机制来获取MetricReporter的相关实现类,本质上是通过ClassLoder来获取。
|-- ReporterSetup
\-- fromConfiguration //当集群启动时,会从配置读取监控并初始化相关类
\-- loadAvailableReporterFactories // 加载有效的Reporter们
\-- getAllReporterFactories //核心代码,通过SPI以及ClassLoader机制获取Repoter们
2. 内置松耦合 上文提到了社区会提供常见的一些监控Repoter。在代码中,本质是工厂模式的实现。
/**
* {@link MetricReporter} factory.
*
* Reporters that can be instantiated with a factory automatically qualify for being loaded as a
* plugin, so long as the reporter jar is self-contained (excluding Flink dependencies) and contains
* a {@code META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory} file
* containing the qualified class name of the factory.
*
* Reporters that previously relied on reflection for instantiation can use the {@link
* InstantiateViaFactory} annotation to redirect reflection-base instantiation attempts to the
* factory instead.
*/
public interface MetricReporterFactory {/**
* Creates a new metric reporter.
*
* @param properties configured properties for the reporter
* @return created metric reporter
*/
MetricReporter createMetricReporter(final Properties properties);
}
每接入一个监控,只要实现相应的工厂方法即可。目前实现的有:
- org.apache.flink.metrics.graphite.GraphiteReporterFactory
- org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
- org.apache.flink.metrics.prometheus.PrometheusReporter
- org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
- org.apache.flink.metrics.statsd.StatsDReporterFactory
- org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
- org.apache.flink.metrics.slf4j.Slf4jReporterFactory
MetricReporterFactory
即可,而上层能感知到的也仅仅是MetricReporter
,和任何具体实现无关,这也是典型的一种防腐设计。文章图片
3. Fail safe 在流计算业务中,如果监控这种旁路逻辑发生问题,是否应该影响到主干逻辑呢?答案是不应该的。
在
MetricRegistryImpl
中(顾名思义,它会将所有的Repoter注册进这个类),构造函数会将相关的MetricReporter
放到线程池中,定期的让它们上报数据。|-- MetricRegistryImpl
\-- constructor
在
WebMonitorEndpoint
中,也有线程池的身影。这个类提供了RestAPI来便于查询Metric。对于其他组件的请求通过Akka来异步发送,并通过线程池来处理这些回调的回复。|-- WebMonitorEndpoint
\-- start
\-- initializeHandlers
\--new JobConfigHandler
|-- AbstractExecutionGraphHandler
\-- handleRequest
这是典型
Fail-safe
的设计。4. 不仅只支持Push 在FLink中,监控数据不仅支持Push,同时还实现了Pull,而实现也非常的简单。
MetricQueryService
实现了MetricQueryServiceGateway
,这意味着它可以被远程调用。其监控数据来源代码追踪:
|-- AbstractMetricGroup
\-- counter
|-- MetricRegistryImpl
\-- register
|-- MetricQueryService
\-- addMetric
上面提到的
WebMonitorEndpoint
也是一样,不过是基于RestAPI的实现,同样提供了Pull的策略。5. 参考资料
- https://nightlies.apache.org/...
- https://cwiki.apache.org/conf...
推荐阅读
- 考研英语阅读终极解决方案——阅读理解如何巧拿高分
- Ⅴ爱阅读,亲子互动——打卡第178天
- “成长”读书社群招募
- 上班后阅读开始变成一件奢侈的事
- 人间词话的智慧
- 读司马懿,知人间事,品百味人生
- 以读攻“毒”唤新活动曹彦斌打卡第二天
- 私通和背叛,他怎么看(——晨读小记)
- 【0212读书感悟】
- 历史教学书籍