Dolphinscheduler任务失败告警脚本

不飞则已,一飞冲天;不鸣则已,一鸣惊人。这篇文章主要讲述Dolphinscheduler任务失败告警脚本相关的知识,希望能为你提供帮助。
【Dolphinscheduler任务失败告警脚本】

#!/usr/bin/env python
import base64
import hashlib
import hmac
import sys
import time
import urllib

import requests
import pymysql
import json
import jsonpath
import datetime
from DBUtils.PooledDB import PooledDB

from apscheduler.schedulers.blocking import BlockingScheduler

# 数据库连接池 需要执行安装pip3 install DBUtils==1.3
POOL = PooledDB(
creator=pymysql,# 使用链接数据库的模块
maxconnections=6,# 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2,# 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5,# 链接池中最多闲置的链接,0和None不限制
maxshared=3,# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True,# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None,# 一个链接最多被重复使用的次数,None表示无限制
setsession=[],# 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host=cdh3,
port=3306,
user=root,
password=123456,
database=dolphinscheduler,
charset=utf8)


def get_timestamp_sign():
timestamp = str(round(time.time() * 1000))
secret = "SECf60cb44c2d678069959bacaed66c08d8bd72248e3384d0736336c9d78d2fab70"# SEC开头的
secret_enc = secret.encode(utf-8)
string_to_sign = \\n.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode(utf-8)
hmac_code = hmac.new(secret_enc, string_to_sign_enc,
digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return (timestamp, sign)


def get_data_from_mysql(sql):

connect = POOL.connection()
cursor = connect.cursor()
try:
cursor.execute(sql)
results = cursor.fetchall()
return results
except:
print("Error: unable to fetch data")
connect.close()


def send_dingding(title, text):
timestamp, sign = get_timestamp_sign()
url = "https://oapi.dingtalk.com/robot/send?access_token=改成自己的钉钉Token" + \\
"& timestamp=" + timestamp + "& sign=" + sign
h = "Content-type": "application/json; charset=utf-8 "
values =
"msgtype": "markdown",
"markdown":
"title": "%s" % title,
"text": "%s" % text
,
"at":
"isAtAll": True


x_msg = json.dumps(values).replace(/n, \\n\\n)
res = requests.post(url, data=https://www.songbingjia.com/android/x_msg, headers=h)
errmsg = json.loads(res.text)[errmsg]
if errmsg == ok:
return ok

return fail: %s % res.text


def analysis_processing():
monitoring_time = (datetime.datetime.now(
) - datetime.timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S")
print(监控时间 + monitoring_time)

process_instance =
SELECT
d.name as project_name,
c.name as workflow_name,
b.name as process_name,
a.NAME as task_name,
a.task_type,
a.start_time,
a.task_json,
a.app_link,
a.end_time,
SUBSTRING_INDEX(a.`host`,:,1) as exec_host ,
a.log_path
FROM t_ds_task_instance a
join t_ds_process_instance b on a.process_definition_id =b.process_definition_id and a.process_instance_id=b.id
join t_ds_process_definition c on b.process_definition_id=c.id
join t_ds_project d on c.project_id=d.id
WHERE a.state = 6 and a.end_time> =
.format(monitoring_time)

alert_instance =
SELECT
create_time,
title,
log,
content
FROM
t_ds_alert
WHERE
title NOT LIKE %%success%%
AND create_time > =
.format(monitoring_time)
# print(process_instance)

result_process_instance = get_data_from_mysql(process_instance)
# print(result_process_instance)
result_alert = get_data_from_mysql(alert_instance)

# 任务告警相关
if result_process_instance is not None:
for result in result_process_instance:
project_name = result[0]# 项目名称
workflow_name = result[1]# 工作流定义名称
process_name = result[2]

    推荐阅读