【xxl-job】分布式任务调度平台 使用总结

一、简介 【【xxl-job】分布式任务调度平台 使用总结】XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。
1.1 使用XXL-JOB的背景
在微服务架构中,由于需要满足高可用,所以每个微服务都可能部署多个实例,而这时候微服务中的定时任务就会出现重复执行问题。通过XXL-JOB可以实现定时任务的分布式调度,且可避免同一执行器的任务重复执行问题。
二、系统架构 【xxl-job】分布式任务调度平台 使用总结
文章图片

调度中心:采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心HA
执行器:任务分布式执行,"执行器"支持集群部署,可保证任务执行HA
数据库:数据库使用Mysql,如果调度中心有多个实例,需要连接同一个Mysql数据库
三、调度中心 3.1 主要功能

  1. 注册中心,维护执行器的在线状态
  2. 维护任务信息
  3. 触发调度
3.2 启动流程
【xxl-job】分布式任务调度平台 使用总结
文章图片

3.3 关键源码
3.3.1 任务调度 下面是一次任务调度的过程:
  • 预读下一次触发时间trigger_next_time <【当前时间+5秒】的任务
  • 遍历任务,将当前时间与任务的下一次触发时间进行比对
    • 如果当前时间 > 任务的下一次触发时间+5秒,说明该次任务调度过期,通过任务的调度过期策略来判断是否需要立即调度一次或忽略
    • 否则判断,如果当前时间 > 任务的下一次触发时间,立即调度一次,并计算任务的下一次触发时间。如果新计算的下一次触发时间小于当前时间+5秒,则将任务直接置入ringData
    • 否则代表当前时间 < 任务的下一次触发时间,将任务置入ringData
  • 更新xxl_job_info表中任务的调度信息(如下一次调度时间)
try {// 获取数据库连接,并将autoCommit设置为false,不自动提交 conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); // 使用for update进行加锁,防止调度中心其他实例同时进行下面的调度工作 preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); // 事务开始// 1、预读下一次触发时间trigger_next_time小于【当前时间+5秒】的任务 long nowTime = System.currentTimeMillis(); List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、将任务置入ringData,ringData是一个ConcurrentHashMap,key为秒数,value为List<任务id> for (XxlJobInfo jobInfo: scheduleList) {// 如果当前时间大于下一次触发时间+5秒,说明该次任务调度过期 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // 判断任务的调度过期策略 MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// 调度策略为立即调度一次 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); }// 刷新下一次调度时间 refreshNextValidTime(jobInfo, new Date()); } else if (nowTime > jobInfo.getTriggerNextTime()) {// 如果当前时间大于下一次触发时间// 立即调度 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 刷新下一次调度时间 refreshNextValidTime(jobInfo, new Date()); // 如果新计算的下一次调度时间小于当前时间+5秒 if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 获取调度时间的秒数 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 将任务置入ringData pushTimeRing(ringSecond, jobInfo.getId()); // 刷新下一次调度时间 refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); }} else {// 当前时间小于等于任务下一次触发时间// 获取调度时间的秒数 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 将任务置入ringData pushTimeRing(ringSecond, jobInfo.getId()); // 刷新下一次调度时间 refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); }}// 3、更新xxl_job_info表中任务的调度信息(如下一次调度时间) for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); }} else { preReadSuc = false; }// 事务结束 }

3.3.2 触发调度 下面是一次触发调度的过程:
  • 每次取当前秒和上一秒的数据,如当前秒为10,则获取10、9秒对应的任务,添加到ringItemData
  • 遍历ringItemData,逐一触发调度
  • 清空ringItemData
try { // 每次取当前秒和上一秒的数据,如果当前秒为10,则取10、9秒对应的任务 List ringItemData = https://www.it610.com/article/new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { List tmpData = https://www.it610.com/article/ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } }logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData.size() > 0) { // 遍历需要进行调度的任务 for (int jobId: ringItemData) { // 触发调度 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } }

四、执行器 4.1 主要功能
  1. 向调度中心注册心跳
  2. 接收调度中心发出的调度请求,执行任务的业务逻辑
  3. 将任务的执行结果发送给调度中心
4.2 启动流程
【xxl-job】分布式任务调度平台 使用总结
文章图片

4.3 关键源码
4.3.1 任务执行 下面是执行器执行一次任务的过程:
  1. 调度中心发起调度请求
  2. 执行器接收到调度请求
  3. 判断任务的运行模式(BEAN、GLUE_GROOVY、GLUE_SCRIPT,由于生产环境中常用的为BEAN模式,这里就只介绍BEAN模式,其他两种模式类似)
  4. 如果为BEAN模式,判断任务是否已绑定JobThread线程
    • 如果已绑定,判断任务的JobHandler是否发生改变
      • 如果发生改变,则跳到第5步
      • 如果没有发生改变,进一步判断阻塞策略
        • SERIAL_EXECUTION:单机串行,置入任务对应JobThread线程的triggerQueue队列排队
        • DISCARD_LATER:如果任务对应JobThread线程正在运行,则直接返回,丢弃该次调度请求
        • COVER_EARLY:覆盖调度,跳到第5步
    • 如果未绑定,则下一步
  5. 注册新的JobThread至jobThreadRepository中并且启动,中断并移除旧的JobThread
  6. 将任务置入JobThread线程的triggerQueue队列排队
  7. JobThread线程不断从自身的triggerQueue队列中获取数据来执行
public ReturnT run(TriggerParam triggerParam) { // 获取任务对应的jobThread线程 JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // 获取运行模式 GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) {// 获取JobHandler(就是bean实例 + @XxlJob注解的方法) IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // 判断JobHandler是否发生改变 if (jobThread!=null && jobHandler != newJobHandler) { // change handler, need kill old thread removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; }// 判断JobHandler是否为空 if (jobHandler == null) { jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } }} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // ... } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // ... } else { return new ReturnT(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); }// 如果任务对应的JobThread线程不为空,则判断任务的阻塞策略 if (jobThread != null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// 如果任务正在运行,则丢弃此次调度 if (jobThread.isRunningOrHasQueue()) { return new ReturnT(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// 覆盖调度,将对应的JobThread线程置为null if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // 将调度入队 } }// 判断JobThread是否为空,如果为空,则注册新的JobThread if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); }// 将调度信息推入到JobThread的调度队列triggerQueue ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; }

五、相关表
表名 描述
xxl_job_group 执行器表,维护执行器的信息
xxl_job_info 任务表,包括任务的调度配置、执行器JobHandler、失败重试次数、下一次调度时间等信息
xxl_job_lock 用作为排他锁
xxl_job_log 任务日志表,包括执行器地址、调度日志、调度结果、执行日志、执行结果等信息
xxl_job_log_report 任务报表,以天为单位进行计算
xxl_job_registry 注册表,维护执行器的心跳
xxl_job_user 用户表

    推荐阅读