Spring 官方批处理框架真香!Spring 全家桶永远滴神!

今日长缨在手,何时缚住苍龙。这篇文章主要讲述Spring 官方批处理框架真香!Spring 全家桶永远滴神!相关的知识,希望能为你提供帮助。


推荐一个很多小伙伴没注意到的 Spring 官方的批处理框架。
Spring Batch 是一个轻量级但功能又十分全面的批处理框架,主要用于批处理场景比如从数据库、文件或队列中读取大量记录。不过,需要注意的是:Spring Batch 不是调度框架。商业和开源领域都有许多优秀的企业调度框架比如 Quartz、XXL-JOB、Elastic-Job。它旨在与调度程序一起工作,而不是取代调度程序。
目前,Spring Batch 也已经被收录进了开源项目 awesome-java (非常棒的 Java 开源项目集合)。



项目地址:https://github.com/CodingDocs/awesome-java


Spring 官方批处理框架真香!Spring 全家桶永远滴神!

文章图片

关于 Spring Batch 的详细介绍可以参考 Spring Batch 官方文档[1],入门教程可以参考下面的内容,原文地址:https://mrbird.cc/Spring-Batch 入门.html 。
项目搭建新建一个 Spring Boot 项目,版本为 2.2.4.RELEASE,artifactId 为 spring-batch-start,项目结构如下图所示:
Spring 官方批处理框架真香!Spring 全家桶永远滴神!

文章图片

然后在 ??pom.xml??? 中引入 Spring Batch、mysql 和 JDBC 依赖,引入后 ??pom.xml?? 内容如下所示:
< ?xml version="1.0" encoding="UTF-8"?> < project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> < modelVersion> 4.0.0< /modelVersion> < parent> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter-parent< /artifactId> < version> 2.2.5.RELEASE< /version> < relativePath/> < !-- lookup parent from repository --> < /parent> < groupId> cc.mrbird< /groupId> < artifactId> spring-batch-start< /artifactId> < version> 0.0.1-SNAPSHOT< /version> < name> spring-batch-start< /name> < description> Demo project for Spring Boot< /description> < properties> < java.version> 1.8< /java.version> < /properties> < dependencies> < dependency> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter-batch< /artifactId> < /dependency> < dependency> < groupId> mysql< /groupId> < artifactId> mysql-connector-java< /artifactId> < /dependency> < dependency> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter-jdbc< /artifactId> < /dependency> < /dependencies> < build> < plugins> < plugin> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-maven-plugin< /artifactId> < /plugin> < /plugins> < /build> < /project>

在编写代码之前,我们先来简单了解下 Spring Batch 的组成:
Spring 官方批处理框架真香!Spring 全家桶永远滴神!

文章图片


  • Spring Batch 里最基本的单元就是任务 Job,一个 Job 由若干个步骤 Step 组成。
  • 任务启动器 Job Launcher 负责运行 Job。
  • 任务存储仓库 Job Repository 存储着 Job 的执行状态,参数和日志等信息。Job 处理任务又可以分为三大类:

    • 数据读取 Item Reader
    • 数据中间处理 Item Processor
    • 数据输出 Item Writer。


任务存储仓库可以是关系型数据库 MySQL,非关系型数据库 MongoDB 或者直接存储在内存中,本篇使用的是 MySQL 作为任务存储仓库。
新建一个名称为 springbatch 的 MySQL 数据库,然后导入 ??org.springframework.batch.core??? 目录下的 ??schema-mysql.sql?? 文件:
Spring 官方批处理框架真香!Spring 全家桶永远滴神!

文章图片

Spring 官方批处理框架真香!Spring 全家桶永远滴神!

文章图片

导入后,库表如下图所示:
Spring 官方批处理框架真香!Spring 全家桶永远滴神!

文章图片

然后在项目的配置文件 ??application.yml?? 里添加 MySQL 相关配置:
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/springbatchusername: rootpassword: 123456

接着在 Spring Boot 的入口类上添加 ??@EnableBatchProcessing?? 注解,表示开启 Spring Batch 批处理功能:
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchStartApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchStartApplication.class, args);
}
}

至此,基本框架搭建好了,下面开始配置一个简单的任务。
编写第一个任务在 ??cc.mrbird.batch??? 目录下新建 job 包,然后在该包下新建一个 ??FirstJobDemo?? 类,代码如下所示:
@Component
public class FirstJobDemo {

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job firstJob() {
return jobBuilderFactory.get("firstJob")
.start(step())
.build();
}

private Step step() {
return stepBuilderFactory.get("step")
.tasklet((contribution, chunkContext) -> {
System.out.println("执行步骤....");
return RepeatStatus.FINISHED;
}).build();
}
}

上面代码中,我们注入了??JobBuilderFactory???任务创建工厂和??StepBuilderFactory???步骤创建工厂,分别用于创建任务 Job 和步骤 Step。??JobBuilderFactory???的??get???方法用于创建一个指定名称的任务,??start???方法指定任务的开始步骤,步骤通过??StepBuilderFactory??构建。
步骤 Step 由若干个小任务 Tasklet 组成,所以我们通过??tasklet???方法创建。??tasklet???方法接收一个??Tasklet???类型参数,??Tasklet??是一个函数是接口,源码如下:
public interface Tasklet {
@Nullable
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

所以我们可以使用 lambda 表达式创建一个匿名实现:
(contribution, chunkContext) -> {
System.out.println("执行步骤....");
return RepeatStatus.FINISHED;
}

该匿名实现必须返回一个明确的执行状态,这里返回??RepeatStatus.FINISHED??表示该小任务执行成功,正常结束。
此外,需要注意的是,我们配置的任务 Job 必须注册到 Spring IOC 容器中,并且任务的名称和步骤的名称组成唯一。比如上面的例子,我们的任务名称为 firstJob,步骤的名称为 step,如果存在别的任务和步骤组合也叫这个名称的话,则会执行失败。
启动项目,控制台打印日志如下:
...
2020-03-06 11:01:11.785INFO 17324 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=firstJob]] launched with the following parameters: [{}]
2020-03-06 11:01:11.846INFO 17324 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step]
执行步骤....
2020-03-06 11:01:11.886INFO 17324 --- [main] o.s.batch.core.step.AbstractStep: Step: [step] executed in 40ms
2020-03-06 11:01:11.909INFO 17324 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=firstJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 101ms

可以看到,任务成功执行了,数据库的库表也将记录相关运行日志。
重新启动项目,控制台并不会再次打印出任务执行日志,因为 Job 名称和 Step 名称组成唯一,执行完的不可重复的任务,不会再次执行。
多步骤任务一个复杂的任务一般包含多个步骤,下面举个多步骤任务的例子。在 job 包下新建??MultiStepJobDemo??类:
@Component
public class MultiStepJobDemo {

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job multiStepJob() {
return jobBuilderFactory.get("multiStepJob")
.start(step1())
.next(step2())
.next(step3())
.build();
}

private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}

上面代码中,我们通过??step1()???、??step2()???和??step3()???三个方法创建了三个步骤。Job 里要使用这些步骤,只需要通过??JobBuilderFactory???的??start???方法指定第一个步骤,然后通过??next??方法不断地指定下一个步骤即可。
启动项目,控制台打印日志如下:
2020-03-06 13:52:52.188INFO 18472 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=multiStepJob]] launched with the following parameters: [{}] 2020-03-06 13:52:52.222INFO 18472 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step1] 执行步骤一操作。。。 2020-03-06 13:52:52.251INFO 18472 --- [main] o.s.batch.core.step.AbstractStep: Step: [step1] executed in 29ms 2020-03-06 13:52:52.292INFO 18472 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step2] 执行步骤二操作。。。 2020-03-06 13:52:52.323INFO 18472 --- [main] o.s.batch.core.step.AbstractStep: Step: [step2] executed in 30ms 2020-03-06 13:52:52.375INFO 18472 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step3] 执行步骤三操作。。。 2020-03-06 13:52:52.405INFO 18472 --- [main] o.s.batch.core.step.AbstractStep: Step: [step3] executed in 29ms 2020-03-06 13:52:52.428INFO 18472 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=multiStepJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 231ms

三个步骤依次执行成功。
多个步骤在执行过程中也可以通过上一个步骤的执行状态来决定是否执行下一个步骤,修改上面的代码:
@Component
public class MultiStepJobDemo {

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job multiStepJob() {
return jobBuilderFactory.get("multiStepJob2")
.start(step1())
.on(ExitStatus.COMPLETED.getExitCode()).to(step2())
.from(step2())
.on(ExitStatus.COMPLETED.getExitCode()).to(step3())
.from(step3()).end()
.build();
}

private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}

??multiStepJob()???方法的含义是:multiStepJob2 任务先执行 step1,当 step1 状态为完成时,接着执行 step2,当 step2 的状态为完成时,接着执行 step3。??ExitStatus.COMPLETED??常量表示任务顺利执行完毕,正常退出,该类还包含以下几种退出状态:
public class ExitStatus implements Serializable, Comparable< ExitStatus> {

/**
* Convenient constant value representing unknown state - assumed not
* continuable.
*/
public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");

/**
* Convenient constant value representing continuable state where processing
* is still taking place, so no further action is required. Used for
* asynchronous execution scenarios where the processing is happening in
* another thread or process and the caller is not required to wait for the
* result.
*/
public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");

/**
* Convenient constant value representing finished processing.
*/
public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");

/**
* Convenient constant value representing job that did no processing (e.g.
* because it was already complete).
*/
public static final ExitStatus NOOP = new ExitStatus("NOOP");

/**
* Convenient constant value representing finished processing with an error.
*/
public static final ExitStatus FAILED = new ExitStatus("FAILED");

/**
* Convenient constant value representing finished processing with
* interrupted status.
*/
public static final ExitStatus STOPPED = new ExitStatus("STOPPED");

...
}

启动项目,控制台日志打印如下:
2020-03-06 14:21:49.384INFO 18745 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [FlowJob: [name=multiStepJob2]] launched with the following parameters: [{}] 2020-03-06 14:21:49.427INFO 18745 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step1] 执行步骤一操作。。。 2020-03-06 14:21:49.456INFO 18745 --- [main] o.s.batch.core.step.AbstractStep: Step: [step1] executed in 29ms 2020-03-06 14:21:49.501INFO 18745 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step2] 执行步骤二操作。。。 2020-03-06 14:21:49.527INFO 18745 --- [main] o.s.batch.core.step.AbstractStep: Step: [step2] executed in 26ms 2020-03-06 14:21:49.576INFO 18745 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step3] 执行步骤三操作。。。 2020-03-06 14:21:49.604INFO 18745 --- [main] o.s.batch.core.step.AbstractStep: Step: [step3] executed in 28ms 2020-03-06 14:21:49.629INFO 18745 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [FlowJob: [name=multiStepJob2]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 238ms

Flow 的用法Flow 的作用就是可以将多个步骤 Step 组合在一起然后再组装到任务 Job 中。举个 Flow 的例子,在 job 包下新建??FlowJobDemo??类:
@Component
public class FlowJobDemo {

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job flowJob() {
return jobBuilderFactory.get("flowJob")
.start(flow())
.next(step3())
.end()
.build();
}

private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

// 创建一个flow对象,包含若干个step
private Flow flow() {
return new FlowBuilder< Flow> ("flow")
.start(step1())
.next(step2())
.build();
}
}

上面代码中,我们通过??FlowBuilder???将 step1 和 step2 组合在一起,创建了一个名为 flow 的 Flow,然后再将其赋给任务 Job。使用 Flow 和 Step 构建 Job 的区别是,Job 流程中包含 Flow 类型的时候需要在??build()???方法前调用??end()??方法。
启动程序,控制台日志打印如下:
2020-03-06 14:36:42.621INFO 18865 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [FlowJob: [name=flowJob]] launched with the following parameters: [{}] 2020-03-06 14:36:42.667INFO 18865 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step1] 执行步骤一操作。。。 2020-03-06 14:36:42.697INFO 18865 --- [main] o.s.batch.core.step.AbstractStep: Step: [step1] executed in 30ms 2020-03-06 14:36:42.744INFO 18865 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step2] 执行步骤二操作。。。 2020-03-06 14:36:42.771INFO 18865 --- [main] o.s.batch.core.step.AbstractStep: Step: [step2] executed in 27ms 2020-03-06 14:36:42.824INFO 18865 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step3] 执行步骤三操作。。。 2020-03-06 14:36:42.850INFO 18865 --- [main] o.s.batch.core.step.AbstractStep: Step: [step3] executed in 25ms 2020-03-06 14:36:42.874INFO 18865 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [FlowJob: [name=flowJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 245ms

并行执行任务中的步骤除了可以串行执行(一个接着一个执行)外,还可以并行执行,并行执行在特定的业务需求下可以提供任务执行效率。
将任务并行化只需两个简单步骤:

  1. 将步骤 Step 转换为 Flow;
  2. 任务 Job 中指定并行 Flow。

举个例子,在 job 包下新建??SplitJobDemo??类:
@Component
public class SplitJobDemo {

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job splitJob() {
return jobBuilderFactory.get("splitJob")
.start(flow1())
.split(new SimpleAsyncTaskExecutor()).add(flow2())
.end()
.build();

}

private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Flow flow1() {
return new FlowBuilder< Flow> ("flow1")
.start(step1())
.next(step2())
.build();
}

private Flow flow2() {
return new FlowBuilder< Flow> ("flow2")
.start(step3())
.build();
}
}

上面例子中,我们创建了两个 Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通过??JobBuilderFactory???的??split??方法,指定一个异步执行器,将 flow1 和 flow2 异步执行(也就是并行)。
启动项目,控制台日志打印如下:
2020-03-06 15:25:43.602INFO 19449 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [FlowJob: [name=splitJob]] launched with the following parameters: [{}] 2020-03-06 15:25:43.643INFO 19449 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler: Executing step: [step3] 2020-03-06 15:25:43.650INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler: Executing step: [step1] 执行步骤三操作。。。 执行步骤一操作。。。 2020-03-06 15:25:43.673INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep: Step: [step1] executed in 23ms 2020-03-06 15:25:43.674INFO 19449 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep: Step: [step3] executed in 31ms 2020-03-06 15:25:43.714INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler: Executing step: [step2] 执行步骤二操作。。。 2020-03-06 15:25:43.738INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep: Step: [step2] executed in 24ms 2020-03-06 15:25:43.758INFO 19449 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [FlowJob: [name=splitJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 146ms

可以看到 step3 并没有在 step2 后才执行,说明步骤已经是并行化的(开启并行化后,并行的步骤执行顺序并不能 100%确定,因为线程调度具有不确定性)。
任务决策器决策器的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行 step1 和 step2,如果是工作日,则之心 step1 和 step3。
使用决策器前,我们需要自定义一个决策器的实现。在 cc.mrbird.batch 包下新建 decider 包,然后创建??MyDecider???类,实现??JobExecutionDecider??接口:
@Component
public class MyDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
LocalDate now = LocalDate.now();
DayOfWeek dayOfWeek = now.getDayOfWeek();

if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
return new FlowExecutionStatus("weekend");
} else {
return new FlowExecutionStatus("workingDay");
}
}
}

??MyDecider???实现??JobExecutionDecider???接口的??decide???方法,该方法返回??FlowExecutionStatus???。上面的逻辑是:判断今天是否是周末,如果是,返回??FlowExecutionStatus("weekend")???状态,否则返回??FlowExecutionStatus("workingDay")??状态。
下面演示如何在任务 Job 里使用决策器。在 job 包下新建??DeciderJobDemo??:
@Component
public class DeciderJobDemo {

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private MyDecider myDecider;

@Bean
public Job deciderJob() {
return jobBuilderFactory.get("deciderJob")
.start(step1())
.next(myDecider)
.from(myDecider).on("weekend").to(step2())
.from(myDecider).on("workingDay").to(step3())
.from(step3()).on("*").to(step4())
.end()
.build();
}

private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}


private Step step4() {
return stepBuilderFactory.get("step4")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤四操作。。。");
return RepeatStatus.FINISHED;
}).build();
}

上面代码中,我们注入了自定义决策器??MyDecider???,然后在??jobDecider()??方法里使用了该决策器:
@Bean
public Job deciderJob() {
return jobBuilderFactory.get("deciderJob")
.start(step1())
.next(myDecider)
.from(myDecider).on("weekend").to(step2())
.from(myDecider).on("workingDay").to(step3())
.from(step3()).on("*").to(step4())
.end()
.build();
}

这段代码的含义是:任务 deciderJob 首先执行 step1,然后指定自定义决策器,如果决策器返回 weekend,那么执行 step2,如果决策器返回 workingDay,那么执行 step3。如果执行了 step3,那么无论 step3 的结果是什么,都将执行 step4。
启动项目,控制台输出如下所示:
2020-03-06 16:09:10.541INFO 19873 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [FlowJob: [name=deciderJob]] launched with the following parameters: [{}] 2020-03-06 16:09:10.609INFO 19873 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step1] 执行步骤一操作。。。 2020-03-06 16:09:10.641INFO 19873 --- [main] o.s.batch.core.step.AbstractStep: Step: [step1] executed in 32ms 2020-03-06 16:09:10.692INFO 19873 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step3] 执行步骤三操作。。。 2020-03-06 16:09:10.723INFO 19873 --- [main] o.s.batch.core.step.AbstractStep: Step: [step3] executed in 31ms 2020-03-06 16:09:10.769INFO 19873 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [step4] 执行步骤四操作。。。 2020-03-06 16:09:10.797INFO 19873 --- [main] o.s.batch.core.step.AbstractStep: Step: [step4] executed in 27ms 2020-03-06 16:09:10.818INFO 19873 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [FlowJob: [name=deciderJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 256ms

因为今天是 2020 年 03 月 06 日星期五,是工作日,所以任务执行了 step1、step3 和 step4。
任务嵌套任务 Job 除了可以由 Step 或者 Flow 构成外,我们还可以将多个任务 Job 转换为特殊的 Step,然后再赋给另一个任务 Job,这就是任务的嵌套。
举个例子,在 job 包下新建??NestedJobDemo??类:
@Component
public class NestedJobDemo {

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager platformTransactionManager;

// 父任务
@Bean
public Job parentJob() {
return jobBuilderFactory.get("parentJob")
.start(childJobOneStep())
.next(childJobTwoStep())
.build();
}


// 将任务转换为特殊的步骤
private Step childJobOneStep() {
return new JobStepBuilder(new StepBuilder("childJobOneStep"))
.job(childJobOne())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}

// 将任务转换为特殊的步骤
private Step childJobTwoStep() {
return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
.job(childJobTwo())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}

// 子任务一
private Job childJobOne() {
return jobBuilderFactory.get("childJobOne")
.start(
stepBuilderFactory.get("childJobOneStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任务一执行步骤。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}

// 子任务二
private Job childJobTwo() {
return jobBuilderFactory.get("childJobTwo")
.start(
stepBuilderFactory.get("childJobTwoStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任务二执行步骤。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}
}

上面代码中,我们通过??childJobOne()???和??childJobTwo()???方法创建了两个任务 Job,这里没什么好说的,前面都介绍过。关键在于??childJobOneStep()???方法和??childJobTwoStep()???方法。在??childJobOneStep()???方法中,我们通过??JobStepBuilder???构建了一个名称为??childJobOneStep??的 Step,顾名思义,它是一个任务型 Step 的构造工厂,可以将任务转换为“特殊”的步骤。在构建过程中,我们还需要传入任务执行器 JobLauncher、任务仓库 JobRepository 和事务管理器 PlatformTransactionManager。
将任务转换为特殊的步骤后,将其赋给父任务 parentJob 即可,流程和前面介绍的一致。
配置好后,启动项目,控制台输出如下所示:
2020-03-06 16:58:39.771INFO 21588 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=parentJob]] launched with the following parameters: [{}] 2020-03-06 16:58:39.812INFO 21588 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [childJobOneStep] 2020-03-06 16:58:39.866INFO 21588 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=childJobOne]] launched with the following parameters: [{}] 2020-03-06 16:58:39.908INFO 21588 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [childJobOneStep] 子任务一执行步骤。。。 2020-03-06 16:58:39.940INFO 21588 --- [main] o.s.batch.core.step.AbstractStep: Step: [childJobOneStep] executed in 32ms 2020-03-06 16:58:39.960INFO 21588 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=childJobOne]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 86ms 2020-03-06 16:58:39.983INFO 21588 --- [main] o.s.batch.core.step.AbstractStep: Step: [childJobOneStep] executed in 171ms 2020-03-06 16:58:40.019INFO 21588 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [childJobTwoStep] 2020-03-06 16:58:40.067INFO 21588 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=childJobTwo]] launched with the following parameters: [{}] 2020-03-06 16:58:40.102INFO 21588 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [childJobTwoStep] 子任务二执行步骤。。。 2020-03-06 16:58:40.130INFO 21588 --- [main] o.s.batch.core.step.AbstractStep: Step: [childJobTwoStep] executed in 28ms 2020-03-06 16:58:40.152INFO 21588 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=childJobTwo]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 75ms 2020-03-06 16:58:40.157INFO 21588 --- [main] o.s.batch.core.step.AbstractStep: Step: [childJobTwoStep] executed in 138ms 2020-03-06 16:58:40.177INFO 21588 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: [SimpleJob: [name=parentJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 398ms

【Spring 官方批处理框架真香!Spring 全家桶永远滴神!】


    推荐阅读