Flink|Flink 实时 metrics
Flink 实时 metrics
目前我们的 flink 任务跑在 yarn 集群上,在面对以下问题时
- 常驻实时 job 是否在稳定运行?
- 实时数据的处理能力如何?消费过慢?是否需要申请更多资源提升消费能力?
- 实时数据质量可靠?是否有丢数据的风险?
- 实时任务现有的资源是否足够支撑现有的数据量?资源是否闲置浪费?
首先介绍下 Flink Metric
- Metric Types
- Counter: 表示收集的数据是按照某个趋势(增加/减少)一直变化的
- Gauge: 表示搜集的数据是一个瞬时的值,与时间没有关系,可以任意变高变低,往往可以用来记录内存使用率、磁盘使用率等。
- Histogram: 统计数据的分布情况。
- Meter:度量一系列事件发生的速率(rate)。
- Metric Reporters
- Metrics 信息可以通过 flink-conf.yaml 配置,在 job 启动的时候实时上报到外部系统上。
- System Metrics
- Flink 内部会预定义一些 Metrics 指标信息,包含 CPU,Memory, IO,Thread,Network,JVM GarbageCollection 等信息
- User Defined Metrics
- 用户可以自己根据自己的业务需要,自定义一些监控指标
val counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
Metric 监控搭建
- 梳理监控指标
a. 系统指标
- job 数量的监控
- 常驻 job 数量的监控
- 及时发现 job 运行过程中的重启,失败问题
- 算子消息处理的 numRecordsIn 和 numRecordsOut
- 线图趋势掌握任务处理的负载量
- 及时发现job资源分配是否合理,尽量避免消息波动带来的系统延迟增高
- 消息延迟监控
- Flink 算子之间消息传递的最大,最小,平均延迟。
- 及时发现任务消息的处理效率波动
- 内存,JVM GC 的状态
- taskmanager 的内存,GC 状态的线图波动。
- 及时发现系统中资源的利用率,合理分配集群资源。
- Source 端我们采用 kafka 作为数据的输入源
a. 通过监控 kafka consumer group 的 lagOffset 来发现flow 的数据消费能力是否有降低。 - Sink 端我们自己实现了 clickhouse,hbase,hive,kafka 等多端输出,为了避免 Flink 的流式处理对 Sink 终端造成过大的写入压力,我们抽象了一个批次的 buffer cache,当数据的批次达到了阀值,或者 buffer cache 一定的时间间隔,就将 buffer cache 内的数据一次性 doFlush 到各端存储, 各个 sink实例 只需实现 BucketBufferedSink.doFlush 方法
- job 数量的监控
- sinkPushCounter 统计进入到 buffercache 的数据条数
- sinkFlushCounter 统计 buffercache flush 出去的数据条数 *
- 实施搭建监控系统
a. 系统部署图
文章图片
b. 在 flink-conf.yml 中 配置 flink metrics reporter,可让 flink 自动的上报 metric 信息
# metrics configuration metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.grph.host: ${host} metrics.reporter.grph.port: ${port} metrics.reporter.grph.protocol: TCP # 运行时指定 metrics.reporter.grph.prefix="flink.${JOB_NAME}"metrics.latency.interval: 30000
- 通过 flink run -yD metrics.reporter.grph.prefix="${JOB_NAME}" 的方式可动态指定各个实时任务的监控进行分组。
- 通过 metrics.latency.interval: 30000 设置每 30s flink 自动上报算子之间的延迟信息。
- 通过配置 mapping.yml 转化为有 label 维度的 Prometheus 数据,推送给 Prometheus
mappings:
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.Status\.(\w+)\.(\w+)\.([\w-]+)\.(\w+)'
match_type: regex
name: flink_taskmanager_Status_${4}_${5}_${6}_${7}
labels:
host: $2
container: $3
job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.Shuffle\.Netty\.(.*)'
match_type: regex
action: drop
name: dropped
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.(.*)\.(Buffers|buffers)\.(.*)$'
match_type: regex
action: drop
name: dropped
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.([\w]+)\-([\w]+)\.(\w+)'
match_type: regex
name: flink_taskmanager_operator_${7}_${9}
labels:
host: $2
container: $3
job_name: $1
operator: $5
task: $6
custom_metric: $7
sink_instance: $8
- match: 'flink\.([\w-]+)\.(.*)\.taskmanager\.(\w+)\.([\w-]+)\.(.+)\.(\d+)\.([\w-]+)\.(\w+)'
match_type: regex
name: flink_taskmanager_operator_${7}_${8}
labels:
host: $2
container: $3
job_name: $1
operator: $5
task: $6
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager\.Status\.(.*)'
match_type: regex
name: flink_jobmanager_Status_$3
labels:
host: $2
job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager.(\w+)$'
match_type: regex
name: flink_jobmanager_${3}
labels:
host: $2
job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager.(.*)\.(\w+)'
match_type: regex
name: flink_jobmanager_${4}
labels:
host: $2
job_name: $1
- match: 'flink\.([\w-]+)\.(.*)\.jobmanager.(.*)\.(.*)\.(.*)'
match_type: regex
name: flink_jobmanager_${4}_${5}
labels:
host: $2
job_name: $1
- match: "."
match_type: regex
action: drop
name: "dropped"
实时监控看板展示
- 通过 kafka lag 及时发现数据堆积导致的消费延迟。
文章图片
- 通过检测在线运行 Job 数量,及时发现Job运行失败的问题。
文章图片
- 通过统计 Source 端 和 Sink 端的消息处理速度,及时反应当前任务的处理能力。
文章图片
- 通过消息的延时指标,发现 Job 的流处理的响应延迟。
文章图片
- 通过 Jvm 内存及 GC 状态,合理分配系统资源。
文章图片
- 通过Sink 算子的 Push to BufferCache 数量与 BufferCache Flush 到各端存储数量的对比,及时发现数据丢失问题。
文章图片
- https://ci.apache.org/project...
- https://github.com/prometheus...
- https://prometheus.io/docs/in...
- https://grafana.com/docs/?plc...
推荐阅读
- Elasticsearch|Elasticsearch 简介
- 实时|实时 OLAP 系统 Druid
- SRS(简单实时视频服务)|SRS(简单实时视频服务) 笔记(1)- 体验
- 有趣的oython|python疲劳驾驶实时检测项目讲解(附代码)
- Flink总结-运行命令参数分析
- 【Flink】Flink手动触发savepoint失败问题
- 百度爱番番实时CDP建设实践
- Injection|Injection For Xcode11 macOS 10.15 Catalina 亲测可用iOS模拟器UI界面调试实时刷新工具
- 读Flink源码谈设计(Exactly Once)
- TiDB Cloud 上线亚马逊云科技 Marketplace,为全球用户提供云端一栈式实时 HTAP 数据库体验