使用AirFlow调度MaxCompute

简介: airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务。
背景
airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务。
一、环境准备

  • Python 2.7.5 PyODPS支持Python2.6以上版本
  • Airflow apache-airflow-1.10.7
1.安装MaxCompute需要的包
pip install setuptools>=3.0
pip install requests>=2.4.0
pip install greenlet>=0.4.10 # 可选,安装后能加速Tunnel上传。
pip install cython>=0.19.0 # 可选,不建议Windows用户安装。
pip install pyodps
注意:如果requests包冲突,先卸载再安装对应的版本
2.执行如下命令检查安装是否成功
python -c "from odps import ODPS"
二、开发步骤
使用AirFlow调度MaxCompute
文章图片

1.在Airflow家目录编写python调度脚本Airiflow_MC.py
# -*- coding: UTF-8 -*-import sysimport osfrom odps import ODPSfrom odps import optionsfrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom datetime import datetime, timedeltafrom configparser import ConfigParserimport timereload(sys)sys.setdefaultencoding('utf8')#修改系统默认编码。# MaxCompute参数设置options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}cfg = ConfigParser()cfg.read("odps.ini")print(cfg.items())odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))default_args = {'owner': 'airflow','depends_on_past': False,'retry_delay': timedelta(minutes=5),'start_date':datetime(2020,1,15)# 'email': ['airflow@example.com'],# 'email_on_failure': False,# 'email_on_retry': False,# 'retries': 1,# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016, 1, 1),}dag = DAG('Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))def read_sql(sqlfile):with io.open(sqlfile, encoding='utf-8', mode='r') as f:sql=f.read()f.closedreturn sqldef get_time():print '当前时间是{}'.format(time.time())return time.time()def mc_job ():project = odps.get_project()# 取到默认项目。instance=odps.run_sql("select * from long_chinese; ")print(instance.get_logview_address())instance.wait_for_success()with instance.open_reader() as reader:count = reader.countprint("查询表数据条数:{}".format(count))for record in reader:print recordreturn countt1 = PythonOperator (task_id = 'get_time' ,provide_context = False ,python_callable = get_time,dag = dag )t2 = PythonOperator (task_id = 'mc_job' ,provide_context = False ,python_callable = mc_job ,dag = dag )t2.set_upstream(t1)

2.提交
python Airiflow_MC.py
3.进行测试
# print the list of active DAGsairflow list_dags# prints the list of tasks the "tutorial" dag_idairflow list_tasks Airiflow_MC# prints the hierarchy of tasks in the tutorial DAGairflow list_tasks Airiflow_MC --tree#测试taskairflow test Airiflow_MC get_time 2010-01-16airflow test Airiflow_MC mc_job 2010-01-16

4.运行调度任务
登录到web界面点击按钮运行
【使用AirFlow调度MaxCompute】使用AirFlow调度MaxCompute
文章图片

5.查看任务运行结果
(1)点击view log
使用AirFlow调度MaxCompute
文章图片

(2)查看结果
使用AirFlow调度MaxCompute
文章图片

原文链接
本文为阿里云原创内容,未经允许不得转载。

    推荐阅读