Elastic-Job-Lite|Elastic-Job-Lite 源码分析-作业初始化过程

最近对分布式调度系统比较感兴趣,Elastic-Job就是其中一款比较常用的开源分布式调度系统,为了更深入的了解他,打算对他的核心代码做一个全面的分析,今天先让我们来分析下他的整个初始化过程。
从一个SimpleJob入手 Elastic-Job支持3中调度作业:SimpleJob、DataflowJob和ScriptJOb
我们从SimpleJob的使用Demo入手,由浅入深的打开整个作业的初始化过程的神秘面纱。
main方法中整个流程如下:
首先,通过createRegistryCenter()方法创建一个用于协调分布式服务的注册中心,说白了就是个ZK Client;
然后,调用createJobConfiguration()创建一个作业配置;
最后用上面创建的ZK注册中心和作业配置来配置作业调度器,并初始化。
接下来分析下createJobConfiguration() ,然后进入我们今天的主题JobScheduler.init() 调度作业初始化过程。

public class Application {public static void main(String[] args) { // 创建、初始化zk client CoordinatorRegistryCenter registryCenter = createRegistryCenter(); // 创建作业配置 LiteJobConfiguration liteJobConfiguration = createJobConfiguration(); // 创建调度作业并初始化 new JobScheduler(registryCenter, liteJobConfiguration).init(); }/** * 创建注册中心 * @return */ private static CoordinatorRegistryCenter createRegistryCenter() { String zkServerLists = "127.0.0.1:2181"; String namespace = "namespace"; // zk配置 ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(zkServerLists, namespace); // 创建、初始化zk client CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig); regCenter.init(); return regCenter; }/** * 创建作业配置 * @return */ private static LiteJobConfiguration createJobConfiguration() { // 定义作业核心配置 String jobName = "myElasticJob"; String cron = "0/3 * * * * ?"; int shardingTotalCount = 1; JobCoreConfiguration jobCoreConfig = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build(); // 定义SIMPLE类型配置 String jobClasName = MyElasticJob.class.getCanonicalName(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(jobCoreConfig, jobClasName); // 定义Lite作业根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build(); return simpleJobRootConfig; } }

createJobConfiguration()
我们来看下createJobConfiguration()方法里做了什么:
【Elastic-Job-Lite|Elastic-Job-Lite 源码分析-作业初始化过程】通过建造者模式创建出Job核心配置类的JobCoreConfiguration一个实例
public final class JobCoreConfiguration { // 必填 private final String jobName; // 必填 private final String cron; /** * 作业分片总数。如果一个作业启动超过作业分片总数的节点,只有 shardingTotalCount 会执行作业。必填。 */ private final int shardingTotalCount; /** * 分片序列号和参数用等号分隔,多个键值对用逗号分隔 * 分片序列号从0开始,不可大于或等于作业分片总数 * 如: * 0=a,1=b,2=c */ private final String shardingItemParameters; /** * 作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业 * 例:每次获取的数据量、作业实例从数据库读取的主键等 */ private final String jobParameter; /** * 是否开启作业执行失效转移。开启表示如果作业在一次作业执行中途宕机,允许将该次未完成的作业在另一作业节点上补偿执行。 * 默认为 false。选填。 */ private final boolean failover; /** * 是否开启错过作业重新执行。 * 默认为 true。选填。 */ private final boolean misfire; private final String description; /** * 作业属性配置。选填。 */ private final JobProperties jobProperties; }

JobCoreConfiguration中主要是一些作业基础信息,其中jobName和cron以及shardingTotalCount是必填项,其他选填。
然后new 一个SimpleJobConfiguration对象,SimpleJobConfiguration类很简单,只有三个成员变量:
public final class SimpleJobConfiguration implements JobTypeConfiguration {// job核心配置 private final JobCoreConfiguration coreConfig; // job类型,Elastic-Job 有SIMPLE, DATAFLOW和SCRIPT这三种类型,这里创建的是SIMPLE类型 private final JobType jobType = JobType.SIMPLE; // 自定义的job全路径名 private final String jobClass; }

其中,jobClass表示用户自定义的Job类的全路径,例如我们创建的MyElasticJob类:
public class MyElasticJob implements SimpleJob {@Override public void execute(ShardingContext shardingContext) { log.info("shardingContext = {}", shardingContext); switch (shardingContext.getShardingItem()) { case 0: log.info("Item = {}", 0); break; case 1: log.info("Item = {}", 1); break; case 2: log.info("Item = {}", 2); break; } } }

对应的jobClass=io.elasticjob.lite.example.MyElasticJob
最后一样使用创建者模式构建出一个LiteJobConfiguration类的对象:
public final class LiteJobConfiguration implements JobRootConfiguration {/** * 作业类型配置,例如:SimpleJobConfiguration. */ private final JobTypeConfiguration typeConfig; /** * 监控作业执行时状态, 默认为true * 1. 每次作业执行时间和间隔时间均非常短的情况, 建议不监控作业运行时状态以提升效率, * 因为是瞬时状态, 所以无必要监控. 请用户自行增加数据堆积监控. 并且不能保证数据重复选取, * 应在作业中实现幂等性. 也无法实现作业失效转移. * 2. 每次作业执行时间和间隔时间均较长短的情况, 建议监控作业运行时状态, 可保证数据不会重复选取. */ private final boolean monitorExecution; /** * 设置最大容忍的本机与注册中心的时间误差秒数。默认为 -1,不检查时间误差。选填。 */ private final int maxTimeDiffSeconds; // 作业辅助监控端口. private final int monitorPort; // 作业分片策略实现类全路径 private final String jobShardingStrategyClass; /** * 服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复。默认为 10。 */ private final int reconcileIntervalMinutes; // 作业是否启动时禁止. private final boolean disabled; // 本地配置是否可覆盖注册中心配置. private final boolean overwrite; /** * 获取作业名称. * * @return 作业名称 */ public String getJobName() { return typeConfig.getCoreConfig().getJobName(); }/** * 是否开启失效转移. * 开启后,如果某个作业节点挂掉了,主节点会将作业分配到另外一个正常的作业节点上, * 保证定时任务到点时能被正常执行。 * @return 是否开启失效转移 */ public boolean isFailover() { return typeConfig.getCoreConfig().isFailover(); } }

注册中心实例和作用配置实例创建完成, 接下来用他们来实例化JobScheduler,并初始化。
JobScheduler.init() 上面的热身结束,接下来开始今天的重点,分析下作业调度器JobScheduler的实例创建和初始化过程:
/** * 作业调度器. */ public class JobScheduler {public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob"; private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade"; private final LiteJobConfiguration liteJobConfig; private final CoordinatorRegistryCenter regCenter; // 为调度器提供内部服务的门面类. @Getter private final SchedulerFacade schedulerFacade; // 作业内部服务门面服务. private final JobFacade jobFacade; public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) { this(regCenter, liteJobConfig, new JobEventBus(), elasticJobListeners); }public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, final ElasticJobListener... elasticJobListeners) { this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners); }private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) { JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance()); this.liteJobConfig = liteJobConfig; this.regCenter = regCenter; List elasticJobListenerList = Arrays.asList(elasticJobListeners); // 给[分布式作业中只执行一次的监听器]设置[保证分布式任务全部开始和结束状态的服务]. setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList); // 创建[为调度器提供内部服务的门面类]对象. schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList); // 创建[为作业提供内部服务的门面类]对象. jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus); }/** * 给[分布式作业中只执行一次的监听器]设置[保证分布式任务全部开始和结束状态的服务]. * @param regCenter * @param elasticJobListeners */ private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List elasticJobListeners) { GuaranteeService guaranteeService = new GuaranteeService(regCenter, liteJobConfig.getJobName()); for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService); } } }/** * 初始化作业. */ public void init() {/** * 1. 更新、拉取ZK上的作业配置. * 这里有三种情况: * * 1)作业第一次启动时ZK上还没有该作业的配置信息 *将作业配置持久化为ZK上以该作业名命名的节点下的config节点的值; * * 2)作业非第一次启动,但是作业配置的overwrite=true,即覆盖作业配置 *用本地的作业配置更新ZK上的config节点值; * * 3)非上面两钟情况,则拉取ZK上的config节点配置,作为本次作业的配置信息。 */ LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); /** * 2. 设置分片总数. * * 作业注册表中维护中一张[作业名-分片数]的对照表: * currentShardingTotalCountMap 使用线程安全的ConcurrentHashMap, * 记录着每个作业的分片数. */ JobRegistry.getInstance(). setCurrentShardingTotalCount( liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig() .getShardingTotalCount() ); // 3. 创建调度控制器, 控制作业的调度、启、停. JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName() ); /** * 4. 添加作业调度控制器. * * 作业注册表中维护中一张[作业名-作业控制器]的对照表: * schedulerMap 使用线程安全的ConcurrentHashMap, * 记录着每个作业的调度控制器. */ JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); /** * 5. 注册作业启动信息. */ schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled()); // 6. 开启作业调度. jobScheduleController.scheduleJob( liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron() ); }/** * 创建jobDetail. * 将jobFacade和我们最开始新建的MyElasticJob类的实例放入JobDataMap中. * @param jobClass * @return */ private JobDetail createJobDetail(final String jobClass) { JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build(); result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade); Optional elasticJobInstance = createElasticJobInstance(); if (elasticJobInstance.isPresent()) { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get()); } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) { try { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance()); } catch (final ReflectiveOperationException ex) { throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass); } } return result; }...}

JobScheduler.init()方法主要做:
  1. 更新、拉取ZK上的作业配置.
  2. 作业注册表中记录分片总数.
  3. 创建调度控制器.
  4. 添加作业调度控制器.
  5. 注册作业启动信息.
  6. 调度作业.
其中调用SchedulerFacade.registerStartUpInfo(final boolean enabled)注册作业启动信息:
/** * 为调度器提供内部服务的门面类. */ public final class SchedulerFacade {.../** * 注册作业启动信息. * * @param enabled 作业是否启用 */ public void registerStartUpInfo(final boolean enabled) {// 开启所有监听器. listenerManager.startAllListeners(); // 选举成功后,将临时节点/jobName/leader/elections/instance的值设置为 // 调度作业唯一标示:ip@-@进程id. leaderService.electLeader(); /** * 持久化作业服务器上线信息. * 将永久节点/jobName/servers/ip 的值设置为""空字符串或"DISABLED". */ serverService.persistOnline(enabled); /** * 持久化作业运行实例上线相关信息. * 在/jobName/instances节点下新建临时节点[ip@-@进程id], 值为""空字符串 */ instanceService.persistOnline(); /** * 设置需要重新分片的标记. * 设置永久节点/jobName/leader/sharding/necessary, 值为""空字符串 */ shardingService.setReshardingFlag(); // 初始化作业监听服务. monitorService.listen(); // 开启调解分布式作业不一致状态服务. if (!reconcileService.isRunning()) { reconcileService.startAsync(); } }/** * 终止作业调度. */ public void shutdownInstance() { if (leaderService.isLeader()) { leaderService.removeLeader(); } monitorService.close(); if (reconcileService.isRunning()) { reconcileService.stopAsync(); } JobRegistry.getInstance().shutdown(jobName); } }

至此Elastic-Job-Lite的整个调度作业的初始化过程完成,最后我们看下JobScheduler这个类的继承关系
Elastic-Job-Lite|Elastic-Job-Lite 源码分析-作业初始化过程
文章图片
JobScheduler.png 可以看到有个子类, SpringJobScheduler继承了JobScheduler, 本文中从一个例子入手,我们这里的例子在引入Elastic-Job时用的是编码方式,而Elastic-Job其实还提供Spring XML配置的方式来引入,所以此处的SpringJobScheduler就是一个基于Spring的作业启动器。
总结 本文主要结合源码分析了下Elastic-Job的作业初始化过程,通过分析了解到作业初始化需要哪些条件;和ZK之间如何交互;如何对Quartz的Scheduler进行封装等。
Elastic-Job作为分布式调度系统,是建立在Quartz基础上实现分布式的,功能上最大的区别在于Elastic-Job提供了弹性扩容缩容、分片和失效转移等满足分布式集群对于调度任务的需求。
而这些功能都建立在ZK这个分布式一致性协调服务器上,通过在ZK上建立复杂的节点,再结合ZK节点的监听机制,完成这一系列功能。
所以在下一篇文章中,将重点介绍下Elastic-Job在ZK上建立了哪些节点、节点作用以及节点之间的复关系。

    推荐阅读