动态配置的|动态配置的 Schedule 设计

1. 背景

定时任务是实际开发中常见的一类功能,例如每天早上凌晨对前一天的注册用户数量、渠道来源进行统计,并以邮件报表的方式发送给相关人员。相信这样的需求,每个开发伙伴都处理过。
你可以使用 Linux 的 Crontab 启动应用程序进行处理,或者直接使用 Spring 的 Schedule 对任务进行调度,还可以使用分布式调度系统,如果 xxl-job 等。相信你已经轻车熟路、习以为常。直到有一天你接到了一个新需求:
  1. 新建一组任务,周期性的执行指定 SQL 并将结果以邮件的方式发送给特定人群;
  2. 比较方便的对任务进行管理,比如 启动、停止,修改调度周期等;
  3. 动态添加、移除任务,不需要频繁的修改、发布程序;
停顿几分钟,简单思考一下,有哪几种实现思路呢?
本篇文章将从一下几部分进行讨论:
  1. Spring Schedule 配置和使用。首先我们将介绍 Demo 的骨架,并基于 Spring-Boot 完成 Schedule 的配置;
  2. 数据库定时轮询方案。使用 Spring Schedule 定时轮询 数据库,并执行相应任务。在执行任务策略中,我们将尝试同步和异步执行两种方案,并对其优缺点进行分析;
  3. 基于 TaskScheduler 动态配置方案。基于数据库 轮询 或 配置中心 两种方案动态的对 Spring TaskScheduler 进行配置,以实现动态管理任务的目的;
  4. 我们进入分布式环境,利用多个冗余节点解决系统高可用问题,同时使用分布式锁保障只会有一个任务同时执行;
2. Spring Schedule
Spring Boot 上的 Schedule 的使用非常简单,无需增加新的依赖,只需简单配置即可。
  1. 使用 @EnableScheduling 启用 Schedule;
  2. 在要调度的方法上增加 @Scheduled;
首先,我们需要在启动类上添加 @EnableScheduling 注解,该注解将启用 SchedulingConfiguration 配置类帮我们完成最基本的配置。
@SpringBootApplication @EnableScheduling public class ConfigurableScheduleDemoApplication {public static void main(String[] args) { SpringApplication.run(ConfigurableScheduleDemoApplication.class, args); }}

启用Schedule配置之后,在需要被调度的方法上增加 @Scheduled 注解。
@Service public class SpringScheduleService { @Autowired private TaskService taskService; @Scheduled(fixedDelay = 5 * 1000, initialDelay = 1000) public void runTask(){ TaskConfig taskConfig = TaskConfig.builder() .name("Spring Default Schedule") .build(); this.taskService.runTask(taskConfig); } }

runTask 任务延迟 1s 进行初始化,并以 5s 为间隔进行调度。
Scheduled 注解类的详细配置如下:
配置 含义 样例
cron linux crontab 表达式 @Scheduled(cron="/5 * MON-FRI") 工作日,每 5 s 调度一次
fixedDelay 固定间隔,上次运行结束,与下次启动运行,相隔固定时长 @Scheduled(fixedDelay=5000) 运行结束后,5S 后启动一次调度
fixedDelayString 与 fixedDelay 一致
fixedRate 固定周期,前后两次运行相隔固定的时长 @Scheduled(fixedRate=5000) 前后两个任务,间隔 5 秒
fixedRateString 与 fixedRate 一致
initialDelay 第一次执行,间隔时间 @Scheduled(initialDelay=1000, fixedRate=5000) 第一次执行,延时 1 秒,以后以 5 秒为周期进行调度
initialDelayString 与 initialDelay 一致
环境搭建完成,让我们开始第一个方案。
3. 数据库定时轮询
使用数据库来管理任务,通过轮询的方案,进行动态调度。首先,我们看下最简单的方案:串行执行方案。
3.1. 串行执行方案 整体思路非常简单,流程如下:
动态配置的|动态配置的 Schedule 设计
文章图片

主要分如下几步:
  1. 在应用中启动一个 Schedule 任务(每 1 秒调度一次),定时从 数据库 中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);
  2. 根据数据库的任务配置信息,依次遍历并执行任务;
  3. 任务执行完成后,经过计算获得下一次调度时间,将其写回到数据库;
  4. 等待下一次任务调度。
核心代码如下:
@Scheduled(fixedDelay = 1000, initialDelay = 1000) public void loadAndRunTask(){ Date now = new Date(); // 加载需要运行的任务: // 1. 状态为 ENABLE // 2. 下一次运行时间 小于 当前时间 List shouldRunTasks = loadShouldRunTasks(now); // 依次遍历待运行任务,执行对于的任务 for (TaskDefinitionV2 task : shouldRunTasks){ // Double Check if (task.shouldRun(now)){ // 执行任务 runTask(task); // 更新任务的下一次运行时间 updateNextRunTime(task, now); } } }

方案简单但非常有效,那该方案存在哪些问题呢?
最主要的问题就是:任务串行执行,会导致后面任务出现延时运行;同时,下一轮检查也会被 delay。
例如,依次加载了待执行任务 task1、task2、task3。其中 task1 耗时 5 秒,task2 耗时 5 秒,task3 耗时 1 秒,由于三个任务串行执行,task2 将延时 5 秒,task3 延时 10秒;下一轮检查距上次启动相差 12 秒。
究其根本,核心问题是 调度线程 和 运行线程 是同一个线程,调度的运行 和 任务的运行相互影响。
让我们看一个改进方案:并行执行方案。
3.2. 并行执行方案 【动态配置的|动态配置的 Schedule 设计】整体执行流程如下:
动态配置的|动态配置的 Schedule 设计
文章图片

相比之前的方案,新方案引入了线程池,每一个任务对应一个线程池,避免任务间的相互影响;任务在线程池中异步处理,避免了调度线程的延时。具体流程如下:
  1. 步骤一不变,在应用中启动一个 Schedule 任务(每 1 秒调度一次),定时从 数据库 中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);
  2. 依次遍历任务,将任务提交到专有线程池中异步执行,调度线程直接返回;
  3. 任务在线程池中运行,结束后更新下一次的运行时间;
  4. 调度线程重新从数据库中获取待执行任务,在将任务提交至线程池中,如果有任务正在执行,使用线程池拒绝策略,抛弃最老的任务;
核心代码如下:
Spring 调度任务,每 1 秒运行一次:
@Scheduled(fixedDelay = 1000, initialDelay = 1000) public void loadAndRunTask(){ Date now = new Date(); // 加载所有待运行的任务 // 1. 状态为 ENABLE // 2. 下一次运行时间小于 当前时间 List shouldRunTasks = loadShouldRunTasks(now); // 遍历待运行任务 for (TaskDefinitionV2 task : shouldRunTasks){ // 1. 根据 Task Id 获取任务对应的线程池 // 2. 将任务提交至线程池中 this.executorServiceForTask(task.getId()) .submit(new TaskRunner(task.getId())); } }

自定义线程池,每个线程池最多只有一个线程,空闲超过 10 秒后,线程自动回收,线程饱和时,直接丢弃最老的任务:
private ExecutorService executorServiceForTask(Long taskId){ return this.executorServiceRegistry.computeIfAbsent(taskId, id->{ BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() // 指定线程池名称 .namingPattern("Async-Task-"+ taskId +"-Thread-%d") // 设置线程为 后台线程 .daemon(true) .build(); // 线程池核心配置: // 1. 每个线程池最多只有一个线程 // 2. 线程空闲超过 10秒 进行自动回收 // 3. 直接使用交互器,线程空闲进行任务交互 // 4. 使用指定的线程工厂,设置线性名称 // 5. 线程池饱和,自动丢弃最老的任务 return new ThreadPoolExecutor(0, 1, 10L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy() ); }); }

最后,在线程池中运行的 Task 如下:
private class TaskRunner implements Runnable { private final Date now = new Date(); private final Long taskId; public TaskRunner(Long taskId) { this.taskId = taskId; }@Override public void run() { // 重新加载任务,保持最新的任务状态 TaskDefinitionV2 task = definitionV2Repository.findById(this.taskId).orElse(null); if (task != null && task.shouldRun(now)){ // 运行任务 runTask(task); // 更新任务的下一次运行时间 updateNextRunTime(task, now); } } }

4. TaskScheduler 配置方案
该方案的核心为:绕过 @Schedule 注解,直接对 Spring 底层核心 类 TaskScheduler 进行配置。
TaskScheduler 接口是 Spring 对调度任务的一个抽象,更是 @Schedule 背后默默的支持者,首先我们看下这个接口定义。
public interface TaskScheduler {ScheduledFuture schedule(Runnable task, Trigger trigger); ScheduledFuture schedule(Runnable task, Instant startTime); ScheduledFuture schedule(Runnable task, Date startTime); ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period); ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period); ScheduledFuture scheduleAtFixedRate(Runnable task, long period); ScheduledFuture scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay); ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay); ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay); }

满满的都是 schedule 接口,其他的比较简单就不过多叙述了,重点说下 Trigger 这个接口,首先看下这个接口的定义:
public interface Trigger { Date nextExecutionTime(TriggerContext triggerContext); }

只有一个方法,获取下次执行的时间。在任务执行完成后,会调用 Trigger 的 nextExecutionTime 获取下一次运行时间,从而实现周期性调度。
CronTrigger 是 Trigger 的最常见实现,以 linux crontab 的方式配置调度任务,如:
scheduler.schedule(task, new CronTrigger("0 15 9-17 * * MON-FRI"));

基础部分简单介绍到这,让我们看下数据库动态配置方案。
4.1 数据库动态配置方案 整体设计如下:
动态配置的|动态配置的 Schedule 设计
文章图片

仍旧是轮询数据库方式,详细流程如下:
  1. 在应用中启动一个 Schedule 任务(每 1 秒调度一次),定时从 数据库 中获取所有任务;
  2. 依次遍历任务,与内存中的 TaskEntry(任务与状态) 进行比对,动态的向 TaskScheduler 中 添加 或 取消 调度任务;
  3. 由 TaskScheduler 负责实际的任务调度;
核心代码如下:
@Scheduled(fixedDelay = 1000, initialDelay = 1000) public void loadAndConfig(){ // 加载所有的任务信息 List tasks = repository.findAll(); // 遍历任务进行任务检查 for (TaskDefinitionV3 task : tasks){ // 获取内存任务状态 TaskEntry taskEntry = this.taskEntry.computeIfAbsent(task.getId(), TaskEntry::new); if (task.isEnable() && taskEntry.isStop()){ // 任务为可用,运行状态为停止,则重新进行 schedule 注册 ScheduledFuture scheduledFuture = this.taskScheduler.scheduleWithFixedDelay(new TaskRunner(task), task.getDelay() * 1000); taskEntry.setScheduledFuture(scheduledFuture); log.info("success to start schedule task for {}", task); }else if (task.isDisable() && taskEntry.isRunning()){ // 任务为禁用,运行状态为运行中,停止正在运行在任务 taskEntry.stop(); log.info("success to stop schedule task for {}", task); } } }

核心辅助类:
@Data private class TaskEntry{ private final Long taskId; private ScheduledFuture scheduledFuture; private TaskEntry(Long taskId) { this.taskId = taskId; }/** * 内存状态 scheduledFuture 为 null,则没有运行的任务 * @return */ public boolean isStop() { return scheduledFuture == null; }/** * 内存状态 scheduledFuture 不为 null,则存在运行的任务 * @return */ public boolean isRunning() { return scheduledFuture != null; }/** * 停止调度任务
* 1. 内存状态设置为 null * 2. 调用 scheduledFuture#cancel 终止正在运行的调度任务 */ public void stop() { ScheduledFuture scheduledFuture = this.scheduledFuture; this.scheduledFuture = null; scheduledFuture.cancel(true); } }

有没有发现,以上方案都有一个共同的缺陷:基于数据库轮询获取任务,加大了数据库压力。理论上,只有在配置发生变化时才有必要对任务进行更新,接下来让我们看下改进方案:基于配置中心的方案。
4.2 配置中心通知方案 整体设计如下:
动态配置的|动态配置的 Schedule 设计
文章图片

核心流程如下:
  1. 应用启动时,从配置中心中获取 调度的配置信息,并完成对 TaskScheduler 的配置;
  2. 当配置发送变化时,配置中心会主动将配置推送到 应用程序,应用程序在接收到变化通知时,动态的增加或取消调度任务;
  3. 任务的实际调度仍旧由 TaskScheduler 完成。
由于手底下没有配置中心,暂时没有 coding,思路很简单,有条件的同学可以自行完成。
5. 分布式环境下应用
以上方案,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就停止了,为了避免这种情况的发生,需要提升系统的可用性,实现 冗余部署 和 自动化容灾。
以上方案,如果部署多个节点会发生什么?是的,会出现任务被多次调度的问题,为了保障在同一时刻只有一个任务在运行,需要为任务增加一个排他锁。同时,由于排他锁的存在,当一个节点处问题后,另一个节点在调度时会自动获取锁,从而解系统的单点问题。
为了简单,我们使用 Redis 的分布式锁。
5.1. 环境搭建
Redisson 是 Redis 的一个富客户端,提供了很多高级的数据结构。本次,我们将使用 RLock 对应用进行保护。
首先,在 pom 中引入 Redisson Starter。
org.redisson redisson-spring-boot-starter 3.16.2

然后,在 application.properties 文件中增加 Redis 配置,具体如下:
spring.redis.host=127.0.0.1 spring.redis.port=6379 spring.redis.database=0

5.2 引入分布式锁 最后,就可以直接使用 分布式锁 对任务执行进行保护了,代码如下:
@Scheduled(fixedDelay = 1000, initialDelay = 1000) public void loadAndRunTaskWithLock(){ Date now = new Date(); // 加载需要运行的任务: // 1. 状态为 ENABLE // 2. 下一次运行时间 小于 当前时间 List shouldRunTasks = loadShouldRunTasks(now); // 依次遍历待运行任务,执行对于的任务 for (TaskDefinitionV2 task : shouldRunTasks){ // Double Check if (task.shouldRun(now)){// 获取分布式锁,用于保证每个任务只能有一个正在运行 RLock lock = this.redissonClient.getFairLock("LoadAndRunScheduleService-" + task.getId()); if (lock.tryLock()) { // 成功获取锁,运行任务 try { log.info("Success to get Lock, begin to run task {}", task.getId()); // 执行任务 runTask(task); // 更新任务的下一次运行时间 updateNextRunTime(task, now); log.info("Success to run task {}", task.getId()); }finally { // 任务运行解释,释放锁 lock.unlock(); } }else { // 未获取锁,打印日志,不做额外处理 log.info("Failed to get Lock!!!!"); } } } }

备注:
Redis 是典型的 AP 应用,而分布式锁严格意义上来说是 CP。所以基于 Redis 的分布式锁只能使用在非严格环境中,比如我们的数据报表需求。如果设计金钱,需要使用 CP 实现,如 Zookeeper 或 etcd 等。
6. 小结
本文从 Spring 的 Schedule 出发,依次对数据库轮询方案、TaskScheduler 配置方案 进行详细讲解,以实现对调度任务的可配置化。最后,使用 Redis 分布式锁有效解决了分布式环境下任务重复调度和自动容灾问题。
仍旧是那句话,架构设计没有更好,只有最适合。同学们可以根据自己的需求自取。
最后,附上源码 源码

    推荐阅读