OPPO大数据离线任务调度系统OFLOW

归志宁无五亩园,读书本意在元元。这篇文章主要讲述OPPO大数据离线任务调度系统OFLOW相关的知识,希望能为你提供帮助。
1 离线调度系统
在整个大数据体系中,在原始数据被采集之后,需要使用各种逻辑进行整合和计算之后才能输出实际有效的数据,才能最终用于商业目的,实现大数据的价值。在整个处理流程中,无论是抽取、转换、装载(ETL)的这些过程,还是数据用户分析处理过程,都是需要包含众多的处理任务,而且这些任务都不是孤立的,而是存在相互依赖和约束关系的。如何高效的调度和管理这些任务是非常关键的,影响到各个流程中的数据的及时性和准确性。在这个过程中任务的高效管理和调度是非常关键的,会影响到各个流程中的数据的及时性和准确性。

图1:数据ETL过程
一个最简单的任务调度系统莫过于Linux系统自带的crontab,使用简单,运行稳定。
?
图2:linux的cron定时任务
在项目刚起步时使用crontab无可厚非,随着调度任务的增多,相互之间又有着依赖,crontab就远远满足不了开发的需求了。这些任务的形态各种各样,任务之间也存在多种多样的依赖关系。一个任务的执行需要一系列的前置任务的完成。比如一个上游任务A完成特定逻辑之后,而下游的任务B则依赖任务A输出的数据结果才能产生自己的数据和结果。因此为了保证数据的准确性和可靠性,就必须根据这些任务之间的依赖关系从上游到下游有序的执行。怎么样让大量的任务准确的完成调度而不出现问题,甚至在任务调度执行中出现错误的情况下,任务能够完成自我恢复甚至执行错误告警与完整的日志查询。大数据离线任务调度系统就是要发挥这样的作用。
?
图3:一个简单的任务依赖图
调度系统的核心功能主要就是如下三点:
组织和管理任务流程,定时调度和执行任务,处理任务间依赖关系。
对于一个完善的离线调度系统,需要有以下核心功能:

  1. 作为大数据体系中的一个指挥中心,负责根据时间,依赖,任务优先级,资源等条件调度任务;
  2. 需要能处理任务的多种依赖关系,包括时间依赖,任务上下游依赖,自身依赖等;
  3. hive,spark以及shell,python等;
  4. 需要有一个完善的监控系统监控整个调度和执行的过程,保障任务调度和执行的整个链条,过程中出现异常情况能即使发送告警通知。
我们的OFLOW系统就是为了实现以上需求的。
2 OFLOW系统在OPPO的应用
OFLOW目前提供的核心功能主要以下几点:
  1. 高效准时的任务调度;
  2. 灵活的调度策略:时间,上下游依赖,任务自身依赖;
  3. 多种任务类型:数据集成、Hive、Python、java、MapReduce、Spark、SparkSQL、Sqoop、机器学习任务等;
  4. 业务间隔离,任务进程间隔离;
  5. 高可用,可扩展;
  6. 任务配置:参数,失败重试(次数,间隔),失败和超时告警,不同级别告警,任务回调;
  7. 丰富全面的操作页面,任务的开发、运维、监控等操作图形化页面化;
  8. 权限管理;
  9. 实时查看任务状态和分析日志,并进行停止、重跑、补录等各种运维操作;
  10. 任务历史数据分析;
  11. 脚本开发,测试,发布流程;
  12. 告警监控:多种异常情况的状态监控,灵活配置;
  13. 核心任务重点监控,保障准点率;
  14. 支持API接入。
【OPPO大数据离线任务调度系统OFLOW】目前OFLOW在我司已经承担了非常多的任务的调度。
OFLOW现有国内,新加坡,印度,欧盟和北美5大集群,欧盟和北美集群最近不久上线的,任务暂时还没上量。目前主力集群是国内,新加坡和印度。
目前用户可以通过以下几种方式接入到OFLOW:
  1. oflow的webserver;
  2. 南天门平台,其中的数据研发 - 离线任务模块,数据集成任务模块,离线脚本开发模块。后端的任务调度和执行全部也是在oflow系统上;
  3. oflow还支持通过api的方式接入,目前也已经有多个业务通过api的方式使用oflow系统;
图4:oflow服务接入方式
3 OFLOW系统的设计和演进
根据前面的信息,可以看到整个离线调度系统最核心的是两个组件,一个的调度引擎,一个是执行引擎。
调度引擎根据任务属性(周期,延迟,依赖关系等)调度任务,根据任务优先级,队列和资源情况分发到不同的执行节点;
执行引擎获取满足执行条件的任务,执行任务,同时输出任务执行过程中的日志,并监控任务执行过程。
在目前市面上常见的离线调度系统中,airflow可以说是其中的佼佼者,经过了多年的发展,功能已经非常完善,在开源社区也非常活跃。
Airflow于2014年10月由Airbnb的Maxime Beauchemin开始;
2015年6月宣布正式加入Airbnb Github;
2016年3月加入了Apache Software Foundation的孵化计划;
目前的更新迭代版本已经到了1-10版本;2-1版本。
我们oppo的离线调度系统是在airflow 1.8版本上引入进来的。
下面是几个在airflow系统中的概念,在其它的离线调度系统中也有类似的概念。
  1. DAG:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的依赖关系。在airflow中,DAG由一个可执行的python脚本来定义。
  2. Operators:可以理解为一个任务模板,描述了DAG中一个具体的task要做的事情。airflow内置了很多operators,如BashOperator 用来执行bash 命令,PythonOperator 调用任意Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令…同时,用户可以自定义Operator,这给用户提供了极大的便利性。他的作用就像java中的class文件。
  3. Sensor是一类特殊的Operator,是被特定条件触发的,比如ExteralTaskSensor, TimeSensor, TimeDeltaSensor。
  4. Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node, 当用户实例化一个operator,即用一些参数特例化一个operator,就生成了一个task。
  5. DagRun:当dag文件被airflow识别并调度时,运行中的DAG即为dagRun。在webUi界面可以看到。
  6. Task Instance:task的一次运行。即运行起来的task,task instance 有自己的状态,包括“running”, “success”, “failed”, “skipped”, “up for retry”等。
在airflow中,定义dag和dag中的任务是通过一个python文件实现的,这就是一个例子。
这个py文件是来定义dag的,虽然它也开源直接运行,但单独运行并没有什么效果,只是检测python语法是否正确。他也不执行具体的工作,只是描述任务之间的依赖关系,以及调度的时间间隔等要求。
这个需要在任务调度和执行时进行解析,才能按照设定逻辑调度,按照用户设定的执行步骤运行。


图5& 图6:原生airflow定义task和dag的方式
这样一个python文件就对数据开发人员提出了比较高的要求,需要平台的用户对python编码很熟练才行。
下图是airflow的整体架构设计,其中airflow home dags用于存储定义dag和task的python文件,webserver用于提供web服务,展示dag视图,实例,日志等非常多的信息,airflow的web页面也是很完善的。scheduler是调度节点,执行dag解析,任务调度的工作;worker节点是执行节点,可以有很多组,可以监听不同的队列,作用是执行scheduler调度起来的任务。

图7:airflow架构
我们oppo的离线调度系统oflow就是在开源airflow的基础上开发的。
开发中解决的几个比较核心的问题是:
  1. 将dag和task的定义从python文件修改为web配置数据库存储,同时dag的解析也是从解析python文件修改为了从数据库查询并解析。
  2. 另外一个就是和公司的大数据开发平台结合,实现开发,测试和发布流程,方便用户的开发,测试验证和发布流。
  3. 另外还添加了很多的监控告警,用来比较全面的监控任务调度和执行的整个流程;
如下是我们OFLOW平台的整个架构:

图8:oflow架构设计
  1. webserver用来提供web服务,方便用户进行dag,task的配置以及非常多的信息查询;
  2. scheduler是调度节点,负责任务的调度,通过解析dag和task,进行一系列的逻辑判断任务是否满足调度条件;
  3. worker是执行节点,负责任务实例的执行;
  4. api server是我们后来开发中新增的一个组件,用来解耦worker和我们数据库的操作,后续也承担的其它的一些功能;
  5. 使用mysql存储dag,task,task_instance等所有的元数据信息;
  6. 使用celery做消息队列,broker使用的是redis;同时redis也充当了缓存的作用;
  7. oflow也同时接入云监控负责发送告警信息,使用ocs用于存储日志和用户脚本文件;
  8. 同时oflow也接入了诊断平台,这个是最新接入的,协助用户对异常的oflow任务进行诊断;
如下这个图显示了整个任务调度和执行的整个流程:

图9:oflow中的任务实例状态流转
目前OFLOW也有了比较全面的监控:

图10:oflow的监控告警
以上就是OFLOW的整体架构,任务调度和执行整个流程。
目前OFLOW的整个服务也存在一些问题:
  1. 任务调度间隔问题:根据前面的任务调度的流程,我们可以看到,oflow任务的调度是通过scheduler周期扫描解析dag和task的。这种方式就会造成任务上下游之间会有一定时间的延迟。比如A任务完成后,直接下游任务B并不能马上被调度执行,需要等待scheduler下次扫描时扫到改任务才能被触发。如果任务的依赖深度比较深,上下游链条很长,每两个任务间有一定间隔,整体的间隔时间就会比较久。尤其是在凌晨任务调度高峰这样的时间点。
  2. 服务高可用问题:原生的oflow不支持高可用。目前我们的方案是准备一个备节点,在检测到scheduler异常时,可以拉起备用节点。
  3. 业务增长造成的调度压力问题:目前oflow每日的任务量非常多,而且也在快速增长,oflow的调度压力也是越来越高,目前的方案的对scheduler进行横向扩展,让不同的scheduler调度不同的dag;
  4. 调度峰谷的成本问题:离线调度任务的一个很明显的特征就是存在任务的高峰和低谷。oflow的天级别和小时级别的调度任务是最多的,这样就会造成在每天的凌晨时间是任务调度的大高峰,在每小时的前一段时间是调度的小高峰,而其它时间段则是低谷。高峰状态任务会出现队列拥堵情况,而低谷时间,机器是处于比较空闲的状态。如何更有效的利用系统资源,也是值得我们后续思考和优化的点。
4 全新的离线调度系统OFLOW 2.0
下面再向大家介绍一下,近期已经上线试用的OFLOW 2.0的产品特殊和架构设计。
我们oflow 2.0平台想解决的问题有以下几点:
  1. 任务实时触发,降低上下游任务之间的延迟;
  2. 不再以dag去组织和调度任务。以dag为调度维度,就会存在跨周期依赖的问题。实际中会有很多任务需要依赖其它dag的任务,比如一个天级别的任务需要依赖另一个小时级别的dag的某个任务在24个周期要全部完成。目前oflow的解决方案是通过一个跨dag依赖任务ExternalTaskSensor去实现的。这个无论是在任务配置上,还是在对概念的理解上,都存在一些问题;
  3. 另外就是希望能简化配置,oflow的dag和task的功能比较强大,但是配置也非常多,用户完成一个dag,一个task的配置需要理解很多概念,输入很多信息。这样好处是比较灵活,但是缺点就是很不方便。我们2.0就希望能够简化配置,隐藏一些不必要的概念和配置;
  4. 同时还希望能更使用户在任务开发,测试和发布等一系列流程更加便捷;
  5. 2.0的各个组件能在高可用和可扩展性上更加便捷简单。
oflow 2.0系统就通过以和1.0差别很大的设计实现这些需求:
  1. 任务实时触发;
  2. 以为业务流程方式组织任务,而非dag,不再需要跨dag依赖的概念;
  3. 各个组件的可扩展性;
  4. 系统的标准化:简化了很多任务的配置,操作门槛更低。任务执行环境标准化,减少环境问题,降低运维方面的成本。
oflow 2.0的整体架构设计如下:
oflow 2.0当前是没有供用户使用的前端页面,是通过南天门2.0的离线模块调用oflwo 1.0的api server。所以你们在使用oflow 2.0的离线模块时,后端的数据存储,任务触发,调度,执行等一系列流程都是在oflow 2.0的平台上实现的。

图11:oflow 2.0的架构设计
  1. 首先的这个组件就是api server。除了南天门调用之外,oflow 2.0内部的worker执行节点也和api server有很多交互;apiserver主要实现的是和2.0数据库的交互,业务流程,任务,实例等各项操作,以及上游任务触发等内在逻辑;
  2. Trigger组件的功能比较纯粹,就是负责扫描任务进行触发;
  3. scheduler调度节点负责任务的调度解析,通过时间轮,任务依赖信息管理,任务优先级和队列等一系列的服务和管理来分析和调度任务;
  4. worker节点和1.0的逻辑比较接近,负责任务的实际执行过程,支持了包括shell, python, sparkSQL和数据集成任务这四种大的类型的任务,同时也支持用户对开发的脚本进行测试,任务执行日志的处理,支持对正在执行的任务进行停止操作,同时还有任务执行结束后的回调逻辑;
  5. Monitor组件一方面是负责监控内部各个组件,其它各个组件在启动后都会向monitor进行注册,后续一旦节点出问题,monitor可以对在该节点上调度和执行的任务进行处理。monitor同时还负责处理任务执行过程中的各种告警信息和一些通知性信息的发送;
其中还有两个消息队列,
  1. 一个是Schedule MQ,负责接收满足部分调度条件可以开始调度的任务并转交给scheduler去处理;
  2. 另一个是Task MQ,负责接收满足所有依赖条件,可以执行的任务,worker端从队列中获取任务并消费。
除了这些开发的组件之外,oflow 2.0也用到了一些通用的产品,包括MySQL, Redis,以及对象存储存在,云监控系统,以及调用了公司IT系统的一些api。
这张图展示了OFLOW的任务调度和执行的整个流程:

图12:oflow 2.0任务实例的调度和执行流程
其中调度开始入口有两个,一个是trigger, 一个是webserver。
trigger负责提前5分钟扫描即将要执行的任务,扫描出来之后放入到schedule mq中;
webserver负责多个触发逻辑,一方面是用户手动触发的任务重跑和补录操作,另一个是上游某个任务完成后,将其直接下游获取出来,放入到schedule mq;
这些消息在schedule mq中会被scheduler消费,schedule会分析任务实例的所有依赖关系,包括时间依赖,上下游依赖,自身依赖等信息。如果任务的各种依赖条件都满足,则会被放到task mq中被worker消费;不满足时间依赖的任务会被放入到时间轮中,等达到相应时间刻度后会自动触发;不满足执行条件的任务的所有依赖信息保存在redis中,等后续时间到达,或者依赖的上游任务完成,会不断更新该实例的依赖信息,直到所有依赖条件满足。满足依赖条件的任务,schedule也会分析任务所属的项目以及任务优先级等配置信息,将任务放入到task mq中的不同的消息队列中;
worker会从task mq中消费任务。拿到任务后,通过获取的任务的详细信息,然后执行。判断任务执行结果,如果执行成功,则会通知到api server, api server除了更新实例状态等信息外,还会同时查询该任务的直接下游,将其直接下游放入到schedule mq中;如果任务失败,则会根据是否还有重试次数决定是否要重试,如果没有重试次数则会认定任务失败,通知到api server, api serer更新实例状态。
目前OFLOW 2.0已经完成了所有的设计,开发和测试环境,应经过了一段时间的内测和压力测试等环节。最近也已经开放试用了。欢迎大家试用2.0系统,并在试用过程中给与反馈和建议。
目前用户如果想使用我们的OFLOW 2.0系统的话,可以登录南天门2.0平台上试用。
5 结语
以上就是我跟大家分享的OFLOW的一些信息。
在此我也展望一下我们后续OFLOW平台的发展:
1)OFLOW 1.0的调度性能问题。由于2.0和1.0系统的变化较大,后续OFLOW 1.0和2.0平台会在一段较长的时间内共存,因此对1.0系统的调度性能我们也需要不断去优化,以应对高速增长的任务量;
一方面是想办法缩短任务间的调度间隔,以提升任务执行效率;
另一方面是希望能探索更便捷有效的扩展方式,应对调度任务量的增加。

图13:oflow的任务增长趋势
2)交互体验上
页面交互的友好性上进行完善;添加一系列的批量任务操作和运维方面的功能;同时还希望以dag或者task等维度展示历史统计信息,以供用户参考;另外就是针对任务操作审计,任务的监控系统进行优化;
3)成本优化
另外一个就是前面提到的成本优化,下图反映的是一天中24个小时的任务并发执行情况,任务存在非常明显的高峰和低谷。

图14:oflow每日各时间段内的实例数量
后续考虑想办法对任务错峰执行,比如在计费模式上去鼓励用户将时效性要求不高的任务放在任务低谷进行执行;另外一个就是希望探索一下资源的动态扩缩容来实现成本优化。
4)另外还希望后续OFLOW不单单起到一个任务调度的作用,希望后续能和后端的大数据集群有更多的交互;
5)还有一点就是希望对监控进行进一步的完善。其中比较关键的一个是核心任务的链路的识别和监控。
就是不但要能监控到核心任务,还能将该核心任务的所有上游逻辑监控到,链路中的某个环节一旦异常,能够很快的告警出来;另外一点是用户收到告警时的处理,很多用户收到任务告警后不清楚如何处理,后续oflow会想办法引导用户处理。
作者简介
Chengwei OPPO高级后端工程师
主要负责OPPO的大数据离线任务调度系统的开发工作,对大数据离线调度系统有比较丰富的开发经验。
获取更多精彩内容,请扫码关注[OPPO数智技术]公众号
?

    推荐阅读