知识就是力量,时间就是生命。这篇文章主要讲述SpringBoot技术专题「开发实战系列」动态化Quartz任务调度机制+实时推送任务数据到前相关的知识,希望能为你提供帮助。
前提介绍
动态化任务调度
添加依赖包
<
!-- quartz -->
<
dependency>
<
groupId>
org.springframework.boot<
/groupId>
<
artifactId>
spring-boot-starter-quartz<
/artifactId>
<
/dependency>
DynamicSchedulerConfig
import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
@Configuration
public class DynamicSchedulerConfig implements SchedulerFactoryBeanCustomizer{
@Override
public void customize(SchedulerFactoryBean schedulerFactoryBean) {
schedulerFactoryBean.setStartupDelay(2);
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.setOverwriteExistingJobs(true);
}
}
application.yml配置
server:
port: 8101
# 默认的profile为dev,其他环境通过指定启动参数使用不同的profile,比如:
#测试环境:java -jar quartz-service.jar --spring.profiles.active=test
#生产环境:java -jar quartz-service.jar --spring.profiles.active=prod
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
#这里是配置druid连接池,以下都是druid的配置信息
url: jdbc:mysql://127.0.0.1:3306/task?useUnicode=true&
characterEncoding=utf-8&
useSSL=false
driver-class-name: com.mysql.jdbc.Driver
username: root
password: 123456
#quartz相关属性配置
quartz:
properties:
org:
quartz:
scheduler:
instanceName: clusteredScheduler
instanceId: AUTO
jobStore:
class: org.quartz.impl.jdbcjobstore.JobStoreTX
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
tablePrefix: QRTZ_
isClustered: true
clusterCheckinInterval: 10000
useProperties: false
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadCount: 10
threadPriority: 5
threadsInheritContextClassLoaderOfInitializingThread: true
#数据库方式
job-store-type: jdbc
# mybatisplus配置
mybatis-plus:
mapper-locations: classpath*:/mapper/**Mapper.xml
#把xml文件放在com.XX.mapper.*中可能会出现找到的问题,这里把他放在resource下的mapper中
typeAliasesPackage: com.task.entity
#这里是实体类的位置,#实体扫描,多个package用逗号或者分号分隔
configuration:
map-underscore-to-camel-case: true
cache-enabled: false
logging:
file: task-info.log
level:
com.task: debug
TaskInfoService业务服务类
@Service
public class TaskInfoService extends IService<
TaskInfoBO>
{
/*** @Title: getPageJob
* @Description: TODO(查询定时任务,分页)
* @param @param search
* @param @return参数
* @return Map<
String,Object>
返回类型
* @throws
*/IPage<
TaskInfoBO>
getPageJob(Pageable pageable, MultiValueMap queryParam);
/*** @Title: getPageJobmod
* @Description: TODO(查询定时任务)
* @param @return参数
* @return TaskInfoBO返回类型
* @throws
*/
TaskInfoBO getPageJobmod();
/**
* @Title: addJob
* @Description: TODO(添加任务)
* @param @param jobClassName 任务路径名称
* @param @param jobGroupName 任务分组
* @param @param cronExpression cron时间规则
* @param @throws Exception参数
* @return void返回类型
* @throws
*/
void addJob(String jobClassName, String jobGroupName, String cronExpression) throws Exception;
/**
* @Title: addJob
* @Description: TODO(添加动态任务)
* @param @param jobClassName 任务路径名称
* @param @param jobGroupName 任务分组
* @param @param cronExpression cron时间规则
* @param @param jobDescription 参数
* @param @param params
* @param @throws Exception参数说明
* @return void返回类型
* @throws
*/void addJob(String jobClassName, String jobGroupName, String cronExpression, String jobDescription, Map<
String, Object>
params) throws Exception;
/**
* @Title: updateJob
* @Description: TODO(更新定时任务)
* @param @param jobClassName 任务路径名称
* @param @param jobGroupName 任务分组
* @param @param cronExpression cron时间规则
* @param @throws Exception参数
* @return void返回类型
* @throws
*/
void updateJob(String jobClassName, String jobGroupName, String cronExpression) throws Exception;
/**
* @Title: deleteJob
* @Description: TODO(删除定时任务)
* @param @param jobClassName 任务路径名称
* @param @param jobGroupName 任务分组
* @param @throws Exception参数
* @return void返回类型
* @throws
*/void deleteJob(String jobClassName, String jobGroupName) throws Exception;
/**
* @Title: pauseJob
* @Description: TODO(暂停定时任务)
* @param @param jobClassName 任务路径名称
* @param @param jobGroupName 任务分组
* @param @throws Exception参数
* @return void返回类型
* @throws
*/
void pauseJob(String jobClassName, String jobGroupName) throws Exception;
/**
* @Title: resumejob
* @Description: TODO(恢复任务)
* @param @param jobClassName 任务路径名称
* @param @param jobGroupName 任务分组
* @param @throws Exception参数
* @return void返回类型
* @throws
*/
void resumejob(String jobClassName, String jobGroupName) throws Exception;
}
TaskInfoServiceImpl业务服务类
@Slf4j
@Service
@Transactional
public class TaskInfoServiceImplextends ServiceImpl<
JobAndTriggerMapper, TaskInfoBO>
implements TaskInfoService {@Autowiredprivate Scheduler scheduler;
@Overridepublic IPage<
TaskInfoBO>
getPageJob(Pageable pageable, MultiValueMap queryParam) {IPage<
TaskInfoBO>
page = new Page<
>
(pageable.getPageNumber(), pageable.getPageSize());
return baseMapper.getJobAndTriggerDetails(page);
}@Overridepublic TaskInfoBO getPageJobmod() {
return baseMapper.getJobAndTriggerDto();
}@Overridepublic void addJob(String jobClassName, String jobGroupName, String cronExpression) throws Exception {
// 启动调度器
scheduler.start();
// 构建job信息
JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass())
.withIdentity(jobClassName, jobGroupName).build();
// 表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName)
.withSchedule(scheduleBuilder).build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
throw new Exception("创建定时任务失败");
}
}@Override
public void addJob(String jobClassName, String jobGroupName, String cronExpression, String jobDescription,
Map<
String, Object>
params) throws Exception {
// 启动调度器
scheduler.start();
// 构建job信息
JobDetail jobDetail = JobBuilder.newJob(TaskInfoServiceImpl.getClass(jobClassName).getClass())
.withIdentity(jobClassName, jobGroupName).withDescription(jobDescription).build();
Iterator<
Map.Entry<
String, Object>
>
var7 = params.entrySet().iterator();
while(var7.hasNext()) {
Map.Entry<
String, Object>
entry = var7.next();
jobDetail.getJobDataMap().put((String)entry.getKey(), entry.getValue());
}
// 表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = (CronTrigger)TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName)
.withSchedule(scheduleBuilder).build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
throw new Exception("创建定时任务失败");
}
}@Overridepublic void updateJob(String jobClassName, String jobGroupName, String cronExpression) throws Exception {
try {
TriggerKey triggerKey = TriggerKey.triggerKey(jobClassName, jobGroupName);
// 表达式调度构建器(动态修改后不立即执行)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
throw new Exception("更新定时任务失败");
}
}@Overridepublic void deleteJob(String jobClassName, String jobGroupName) throws Exception {
scheduler.pauseTrigger(TriggerKey.triggerKey(jobClassName, jobGroupName));
scheduler.unscheduleJob(TriggerKey.triggerKey(jobClassName, jobGroupName));
scheduler.deleteJob(JobKey.jobKey(jobClassName, jobGroupName));
}@Override
public void pauseJob(String jobClassName, String jobGroupName) throws Exception {
scheduler.pauseJob(JobKey.jobKey(jobClassName, jobGroupName));
}@Overridepublic void resumejob(String jobClassName, String jobGroupName) throws Exception {
scheduler.resumeJob(JobKey.jobKey(jobClassName, jobGroupName));
}public static BaseJob getClass(String classname) throws Exception {
Class<
?>
class1 = Class.forName(classname);
return (BaseJob) class1.newInstance();
}
}
TaskInfoMapper
public interface JobAndTriggerMapper extends BaseMapper<
TaskInfoBO>
{
IPage<
TaskInfoBO>
getJobAndTriggerDetails(IPage<
TaskInfoBO>
page);
TaskInfoBO getTaskInfoModel;
}
TaskInfoMapper.xml
<
?xml version="1.0" encoding="UTF-8"?>
<
!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<
mapper namespace="com.south.data.mapper.JobAndTriggerMapper">
<
select id="getJobAndTriggerDetails" resultType="com.data.vo.TaskInfoBO">
SELECTjd.JOB_NAME AS jobName,jd.DESCRIPTION AS jobDescription,jd.JOB_GROUP AS jobGroupName,jd.JOB_CLASS_NAME AS jobClassName,t.TRIGGER_NAME AS triggerName,t.TRIGGER_GROUP AS triggerGroupName,FROM_UNIXTIME(t.PREV_FIRE_TIME/1000,\'%Y-%m-%d %T\') AS prevFireTime,FROM_UNIXTIME(t.NEXT_FIRE_TIME/1000,\'%Y-%m-%d %T\') AS nextFireTime,ct.CRON_EXPRESSION AS cronExpression,t.TRIGGER_STATE AS triggerStateFROMqrtz_job_details jdJOIN qrtz_triggers tJOIN qrtz_cron_triggers ct ON jd.JOB_NAME = t.JOB_NAMEAND t.TRIGGER_NAME = ct.TRIGGER_NAMEAND t.TRIGGER_GROUP = ct.TRIGGER_GROUP<
/select>
<
select id="getJobAndTriggerDto" resultType="com.south.data.vo.JobAndTriggerDto">
SELECTjd.JOB_NAME AS jobName,jd.DESCRIPTION AS jobDescription,jd.JOB_GROUP AS jobGroupName,jd.JOB_CLASS_NAME AS jobClassName,t.TRIGGER_NAME AS triggerName,t.TRIGGER_GROUP AS triggerGroupName,FROM_UNIXTIME(t.PREV_FIRE_TIME/1000,\'%Y-%m-%d %T\') AS prevFireTime,FROM_UNIXTIME(t.NEXT_FIRE_TIME/1000,\'%Y-%m-%d %T\') AS nextFireTime,ct.CRON_EXPRESSION AS cronExpression,t.TRIGGER_STATE AS triggerStateFROMqrtz_job_details jdJOIN qrtz_triggers tJOIN qrtz_cron_triggers ct ON jd.JOB_NAME = t.JOB_NAMEAND t.TRIGGER_NAME = ct.TRIGGER_NAMEAND t.TRIGGER_GROUP = ct.TRIGGER_GROUP<
/select>
<
/mapper>
BaseJob类
public interface BaseJob extends Job {
public void execute(JobExecutionContext context) throws JobExecutionException;
}
TaskController控制器
@RestController
@RequestMapping(value = "https://www.songbingjia.com/job")
public class JobController{@Autowired
private TaskInfoService taskInfoService;
public JobController(TaskInfoService taskInfoService){
this.taskInfoService = taskInfoService;
}@PostMapping(value = "https://www.songbingjia.com/page")
public ResponseEntity<
List<
TaskInfoBO>
>
queryjob(Pageable pageable, @RequestParam MultiValueMap<
String, String>
queryParams, UriComponentsBuilder uriBuilder) {
IPage<
TaskInfoBO>
page = taskInfoService.getPageJob(pageable, queryParams);
HttpHeaders headers = PaginationUtil.generatePaginationHttpHeaders(uriBuilder.queryParams(queryParams), page);
return ResponseEntity.ok().headers(headers).body(page.getRecords());
}/**
* @Title: addJob
* @Description: TODO(添加Job)
* @param jobClassName
* 类名
* @param jobGroupName
* 组名
* @param cronExpression
* 表达式,如:0/5 * * * * ? (每隔5秒)
*/
@PostMapping(value = "https://www.songbingjia.com/add")
public ResponseEntity addJob(
@RequestParam(value = "https://www.songbingjia.com/android/jobClassName") String jobClassName,
@RequestParam(value = "https://www.songbingjia.com/android/jobGroupName") String jobGroupName,
@RequestParam(value = "https://www.songbingjia.com/android/cronExpression") String cronExpression){
try {
jobAndTriggerService.addJob(jobClassName, jobGroupName, cronExpression);
return ResponseEntity.ok().body("操作成功");
} catch (Exception e) {
return ResponseEntity.ok().body("操作失败");
}
}/**
* @Title: pauseJob
* @Description: TODO(暂停Job)
* @param jobClassName
*类名
* @param jobGroupName
*组名
*/@PostMapping(value = "https://www.songbingjia.com/pause")
public ResponseEntity pauseJob(
@RequestParam(value = "https://www.songbingjia.com/android/jobClassName") String jobClassName,
@RequestParam(value = "https://www.songbingjia.com/android/jobGroupName") String jobGroupName) {
try {
taskInfoService.pauseJob(jobClassName, jobGroupName);
return ResponseEntity.ok().body("操作成功");
} catch (Exception e) {
return ResponseEntity.ok().body("操作失败");
}
}
/**
* @Title: resumeJob
* @Description: TODO(恢复Job)
* @param jobClassName
*类名
* @param jobGroupName
*组名
*/
@PostMapping(value = "https://www.songbingjia.com/resume")
public ResponseEntity resumeJob(
@RequestParam(value = "https://www.songbingjia.com/android/jobClassName") String jobClassName,
@RequestParam(value = "https://www.songbingjia.com/android/jobGroupName") String jobGroupName) {
try {
taskInfoService.resumejob(jobClassName, jobGroupName);
return ResponseEntity.ok().body("操作成功");
} catch (Exception e) {
return ResponseEntity.ok().body("操作失败");
}
}
/**
* @Title: rescheduleJob
* @Description: TODO(重新设置Job)
* @param jobClassName
*类名
* @param jobGroupName
*组名
* @param cronExpression
*表达式
*/@PostMapping(value = "https://www.songbingjia.com/reschedule")
public ResponseEntity rescheduleJob(
@RequestParam(value = "https://www.songbingjia.com/android/jobClassName") String jobClassName,
@RequestParam(value = "https://www.songbingjia.com/android/jobGroupName") String jobGroupName,
@RequestParam(value = "https://www.songbingjia.com/android/cronExpression") String cronExpression) {
try {
taskInfoService.updateJob(jobClassName, jobGroupName, cronExpression);
return ResponseEntity.ok().body("操作成功");
} catch (Exception e) {
return ResponseEntity.ok().body("操作失败");
}
}/**
* @Title: deleteJob
* @Description: TODO(删除Job)
* @param jobClassName
*类名
* @param jobGroupName
*组名
*/@RequestMapping(value = "https://www.songbingjia.com/del", method = RequestMethod.POST)public ResponseEntity deleteJob(@RequestParam(value = "https://www.songbingjia.com/android/jobClassName") String jobClassName, @RequestParam(value = "https://www.songbingjia.com/android/jobGroupName") String jobGroupName) {
try {
taskInfoService.deleteJob(jobClassName, jobGroupName);
return ResponseEntity.ok().body("操作成功");
} catch (Exception e) {
return ResponseEntity.ok().body("操作失败");
}
}
}
文章图片
DeferredResult实现实时推送
- 浏览器要实时展示服务端计算出来的数据。一种可能的实现是:浏览器频繁向服务端发起请求以获得服务端数据。
- 【SpringBoot技术专题「开发实战系列」动态化Quartz任务调度机制+实时推送任务数据到前】若定时周期为S,则数据延迟周期最大即为S。若想缩短数据延迟周期,则应使S尽量小,而S越小,浏览器向服务端发起请求的频率越高,又造成网络握手次数越多,影响了效率。因此,此场景应使用服务端实时推送技术。
- 这里说是推送,其实还是基于请求-响应机制,只不过发起的请求会在服务端挂起,直到请求超时或服务端有数据推送时才会做出响应,响应的时机完全由服务端控制。所以,整体效果看起来就像是服务端真的在“实时推送”一样。
@RequestMapping("/call")
@ResponseBody
public DeferredResult<
Object>
call() {
// 泛型Object表示返回结果的类型
DeferredResult<
Object>
response = new DeferredResult<
Object>
(10000, // 请求的超时时间
null);
// 超时后响应的结果
response.onCompletion(new Runnable() {
@Override
public void run() {
// 请求处理完成后所做的一些工作
}
});
// 设置响应结果
// 调用此方法时立即向浏览器发出响应;未调用时请求被挂起
response.setResult(new Object());
return response;
}
执行逻辑
- 浏览器发起异步请求
- 请求到达服务端被挂起(使用浏览器查看请求状态,此时为pending)
- 向浏览器进行响应,分为两种情况:
- 服务端调用DeferredResult.setResult(),请求被唤醒,返回结果。
- 超时,返回一个你设定的结果。
- 浏览得到响应,再次重复1,处理此次响应结果
public interface DeferredData {
String getId();
// 唯一标识
}
DeferredResult的持有者
public interface IDeferredResultHolder<
DeferredData>
{
DeferredResult<
DeferredData>
newDeferredResult(String key, long timeout, Object timeoutResult);
void add(String key, DeferredResult<
DeferredData>
deferredResult);
DeferredResult<
DeferredData>
get(String key);
void remove(String key);
void handleDeferredData(DeferredData deferredData);
}
DeferredResult的持有者实现
public class DeferredResultHolder implements IDeferredResultHolder<
DeferredData>
{private Map<
String, DeferredResult<
DeferredData>
>
deferredResults = new ConcurrentHashMap<
String, DeferredResult<
DeferredData>
>
();
public DeferredResult<
DeferredData>
newDeferredResult(String key) {
return new DeferredResult(key, 30 * 1000L, null);
}
public DeferredResult<
DeferredData>
newDeferredResult(String key, long timeout) {
return new DeferredResult(key, timeout, null);
}
public DeferredResult<
DeferredData>
newDeferredResult(String key, Object timeoutResult) {
return new DeferredResult(key, 30 * 1000L, timeoutResult);
}
@Override
public DeferredResult<
DeferredData>
newDeferredResult(String key, long timeout, Object timeoutResult) {
DeferredResult<
DeferredData>
deferredResult = newDeferredResult<
DeferredData>
(timeout, timeoutResult);
add(key, deferredResult);
deferredResult.onCompletion(new Runnable() {
@Override
public void run() {
remove(key);
}
});
return deferredResult;
}@Override
public void add(String key, DeferredResult<
DeferredData>
deferredResult) {
deferredResults.put(key, deferredResult);
}@Override
public DeferredResult<
DeferredData>
get(String key) {
return deferredResults.get(key);
}@Override
public void remove(String key) {
deferredResults.remove(key);
}@Override
public void handleDeferredData(DeferredData deferredData) {
String key = deferredData.getId();
DeferredResult<
DeferredData>
deferredResult = get(key);
if (deferredResult != null) {
deferredResult.setResult(deferredData);
}
}
}
调用端
@RequestMapping
@Controller
public class CallController {
@Autowired
private DeferredResultHolder deferredResultHolder;
@RequestMapping("/call")
@ResponseBody
public DeferredResult<
DeferredData>
call() {
String id = "abc";
return deferredResultHolder.newDeferredResult(id, 10 * 1000L, null);
}
}
触发返回端
@RequestMapping
@Controller
public class CallController {
@Autowired
private DeferredResultHolder deferredResultHolder;
@RequestMapping("/finished")
@ResponseBody
public void finished() {
String id = "abc";
DeferredData defdatq = new CustomerDeferredData(id);
// 此处的CustomerDeferredData为实现了DeferredData接口的实现模型
return deferredResultHolder.handleDeferredData(defdatq);
}
}
推荐阅读
- Alibaba工具型技术系列「EasyExcel技术专题」实战项目中常用的Excel操作指南
- Spring功能介绍加载时织入机制的Aspectj和LoadTimeWeaving技术
- C语言进阶——数据的存储
- u盘安装win7系统详细图文详细教程图解
- 装机高手教你怎样用u盘安装win7系统
- U打开u盘制作工具最新推荐
- 装机高手教你处理mmc无法创建管理单元
- 装机高手教你bios设置硬盘模式
- 装机高手教你进入bios