利用Spark监听listener来监控任务完成进度
一、背景
当时在做数据湖的项目,需要使用Spark SQL做数据ETL,即并发地将全表数据从RDBMS经过数据转换等导入到HDFS中。由于Web UI上需要显示ETL的进度,因此需要能够指导当前导了多少个row。但是由于是多个executor并发地读取数据,而如何获取每个executor导了多少个row就是一个问题了,Spark SQL本身并没有提供这样的API。本文将介绍如何使用Spark监听listener来预估任务完成的进度。
二、实现方法
- 首先,自定义一个监听类,并继承SparkListener并override方法;
- 实例化该监听类得到监听器对象,sparkcontex添加该监听器对象即可。
import org.apache.spark.scheduler._
import org.slf4j.LoggerFactory/*
* This class is used to listen the progress of submitted spark job
* The number of completed tasks will be counted
* In this way, the rough progress of submitted spark job can be estimated
* */
class MySparkListener(instanceName:String,schemaName:String,tableName:String,ceilNum:Long,rowCount:Long,parallelismNum:Int,partitionNum:Int) extends SparkListener{val logger = LoggerFactory.getLogger(classOf[MySparkListener])
var taskCount: Int = 0
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
super.onApplicationStart(applicationStart)
logger.info("\n\n\n>>>>>> Spark application started")
}override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
super.onApplicationEnd(applicationEnd)
logger.info("\n\n\n>>>>>> Spark application ended")
}override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
super.onJobEnd(jobEnd)
}override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
super.onStageCompleted(stageCompleted)
}override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
super.onTaskEnd(taskEnd)
taskCount = taskCount + 1
if(taskCount <= parallelismNum + 1){
val sparkJobProgress = Math.floor(rowCount*(0.1+taskCount*0.76/(parallelismNum+1))).toLong
if(sparkJobProgress <= rowCount){
// 处理逻辑,更新进度......
}
}
}
}object Main {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("yarn").appName("Datalake")getOrCreate()
val sc = sparkSession.sparkContext
logger.info(">>>>>> start spark listener")
val sparkListener = newMySparkListener(instanceName,schemaName,tableName,ceilNum,rowCount,parallelismNum,partitionNum)
sc.addSparkListener(sparkListener)
【利用Spark监听listener来监控任务完成进度】如有错误,敬请指正!
推荐阅读
- Spark|Spark 数据倾斜及其解决方案
- Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
- Spring|Spring Boot 自动配置的原理、核心注解以及利用自动配置实现了自定义 Starter 组件
- 【万伽复利】什么是复利(如何利用复利赚钱?)
- 苹果手机如何利用库乐队自制铃声
- vue_day05
- “没有利用价值的人是很受冷遇的。”
- 可悲的好人
- linux监听蒲公英线程,重启
- 利用好你的暗时间,让成长无处不在