
Elex'Blog
文章
标签
10

阅

SpringBoot集成Quartz自定义表管理定时器
前言
本文章中出现的第三方框架方法:MyBatis-Plus,Hutools,Quartz。
1. 建表
CREATE TABLE `quartz_job_manager` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`job_name` varchar(255) NOT NULL COMMENT '任务名称',
`job_group` varchar(50) DEFAULT NULL COMMENT '任务分组',
`start_time` datetime DEFAULT NULL COMMENT '一次行执行时间',
`cron_expression` varchar(30) DEFAULT NULL COMMENT 'cron表达式',
`bean_class` varchar(255) DEFAULT NULL COMMENT '执行方法',
`job_status` int(11) DEFAULT NULL COMMENT '任务状态;0.执行 1.暂停',
`job_type` int(11) DEFAULT NULL COMMENT '任务类型;0.一次性任务 1.长期任务',
`deleted` int(11) DEFAULT NULL COMMENT '0正常,1已删除',
`creator_id` int(11) DEFAULT NULL COMMENT '创建用户Id',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`modifier_id` int(11) DEFAULT NULL COMMENT '修改用户id',
`modify_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`,`job_name`)
) ENGINE=InnoDB AUTO_INCREMENT=3120 DEFAULT CHARSET=utf8 COMMENT='定时任务管理';
2. 导包
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
3. 添加配置文件
在resources下添加quartz.properties配置文件
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#org.quartz.scheduler.instanceName: DefaultQuartzScheduler
#org.quartz.scheduler.rmi.export: false
#org.quartz.scheduler.rmi.proxy: false
#org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount= 20
#org.quartz.threadPool.threadPriority: 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#org.quartz.jobStore.misfireThreshold: 60000
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
4. 添加定时器工厂类
@Component
public class JobFactory extends AdaptableJobFactory {
/**
* Job对象的实例化过程是在Quartz中进行的,这时候我们需要将spring的东西注入进来
*/
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
//进行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
5. 添加定时器配置
@Configuration
public class JobConfigration {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory) {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
try {
schedulerFactoryBean.setOverwriteExistingJobs(true);
schedulerFactoryBean.setQuartzProperties(quartzProperties());
schedulerFactoryBean.setJobFactory(jobFactory);
} catch (Exception e) {
e.printStackTrace();
}
return schedulerFactoryBean;
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
/**
* 创建schedule
* @return Scheduler
*/
@Bean(name = "scheduler")
public Scheduler scheduler(JobFactory jobFactory) {
return schedulerFactoryBean(jobFactory).getScheduler();
}
}
6. 定时任务操作类
@Component
@Slf4j
public class JobManagerUtil {
@Resource
private Scheduler scheduler;
/**
* 添加任务2
*
* @param jobName jobName
* @param jobGroup jobGroup
* @param cronExpression cronExpression
* @param beanClass beanClass
*/
public void addCommonJob(String jobName, String jobGroup, String cronExpression, Class<? extends Job> beanClass) {
try {
// 创建jobDetail实例,绑定Job实现类 ,指明job的名称,所在组的名称,以及绑定job类
JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
// 定义调度触发规则 使用 cornTrigger 规则
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
.startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow().build();
// 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, trigger);
// 启动
if (!scheduler.isStarted()) {
scheduler.start();
}
} catch (Exception e) {
log.error("add Job error", e);
}
}
/**
* 添加任务3 一次性任务
*
* @param jobName jobName
* @param jobGroup jobGroup
* @param startTime startTime
* @param beanClass beanClass
*/
public void addSingleCommonJob(String jobName, String jobGroup, Date startTime, Class<? extends Job> beanClass) {
try {
JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
SimpleTrigger singleTrigger = TriggerBuilder.newTrigger()
.withIdentity(jobName, jobGroup).startAt(startTime).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1))
.build();
// 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, singleTrigger);
// 启动
if (!scheduler.isStarted()) {
scheduler.start();
}
} catch (Exception e) {
log.error("add Job error", e);
}
}
/**
* 添加一个停止任务
* Params:
* jobName – jobName
* jobGroup – jobGroup
* startTime – startTime
* beanClass – beanClass
*/
public void addCommonJobShutdown(String jobName, String jobGroup, String cronExpression, Class<? extends Job> beanClass) {
try {
// 创建jobDetail实例,绑定Job实现类 ,指明job的名称,所在组的名称,以及绑定job类
JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
// 定义调度触发规则 使用 cornTrigger 规则
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
.startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow().build();
// 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, trigger);
// 停止
if (scheduler.isStarted()) {
scheduler.shutdown();
}
} catch (Exception e) {
log.error("add Job error", e);
}
}
/**
* 暂停任务
*/
public void pauseJob(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
log.error("pauseJob error", e);
}
}
/**
* 恢复任务
*/
public void resumeJob(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
scheduler.resumeJob(jobKey);
} catch (SchedulerException e) {
log.error("resumeJob error", e);
}
}
/**
* 删除任务
*/
public void deleteJob(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
if (scheduler.checkExists(jobKey)) {
scheduler.deleteJob(jobKey);
log.debug("定时任务:{}.{}-已删除!", jobName, jobGroup);
}
} catch (SchedulerException e) {
log.error("delete job error", e);
}
}
/**
* 启动任务(将现有任务再执行一次<仅执行一次并非新建了一个任务>
*/
public void runJobNow(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
scheduler.triggerJob(jobKey);
} catch (SchedulerException e) {
log.error("run Job now error", e);
}
}
/**
* 查看任务是否存在
*/
public Boolean checkExists(String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
return scheduler.checkExists(jobKey);
} catch (SchedulerException e) {
log.error("pauseJob error", e);
return false;
}
}
}
7. 定时任务实体
/**
* 定时任务管理
* @TableName quartz_job_manager
*/
@TableName(value ="quartz_job_manager")
@Data
public class QuartzJobManager implements Serializable {
/**
*
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 任务名称
*/
@TableField(value = "job_name")
private String jobName;
/**
* 任务分组
*/
@TableField(value = "job_group")
private String jobGroup;
/**
* 一次行执行时间
*/
@TableField(value = "start_time")
private Date startTime;
/**
* cron表达式
*/
@TableField(value = "cron_expression")
private String cronExpression;
/**
* 执行方法
*/
@TableField(value = "bean_class")
private String beanClass;
/**
* 任务状态;0.执行 1.暂停
*/
@TableField(value = "job_status")
private Integer jobStatus;
/**
* 任务类型;0.一次性任务 1.长期任务
*/
@TableField(value = "job_type")
private Integer jobType;
/**
* 0正常,1已删除
*/
@TableField(value = "deleted")
private Integer deleted;
/**
* 创建用户Id
*/
@TableField(value = "creator_id")
private Integer creatorId;
/**
* 创建时间
*/
@TableField(value = "create_time")
private Date createTime;
/**
* 修改用户id
*/
@TableField(value = "modifier_id")
private Integer modifierId;
/**
* 修改时间
*/
@TableField(value = "modify_time")
private Date modifyTime;
@TableField(exist = false)
private static final long serialVersionUID = 1L;
}
7.1. 任务操作类型枚举
7.1.1. 任务状态枚举
/**
* 任务状态枚举
* @author Elex
* @date 2022-06-10 17:16
*/
@Getter
@AllArgsConstructor
public enum JobStatusEnum {
NORMAL(0,"正常"),
PAUSE(1,"暂停");
/**
* code编码
*/
private Integer code;
/**
* 中文信息描述
*/
private String desc;
public static String reversal(Integer code) {
return EnumUtil.getBy(JobStatusEnum.class, key -> key.getCode().equals(code)).getDesc();
}
}
7.1.2. 任务状态枚举
/**
* 任务状态枚举
*
* @author Elex
* @date 2022-06-10 16:57
*/
@Getter
@AllArgsConstructor
public enum JobTaskEnum {
RUN(0, "运行"),
PAUSE(1, "暂停"),
RECOVER(2, "恢复");
/**
* code编码
*/
private Integer code;
/**
* 中文信息描述
*/
private String desc;
public static String reversal(Integer code) {
return EnumUtil.getBy(JobTaskEnum.class, key -> key.getCode().equals(code)).getDesc();
}
}
7.1.3. 任务类型枚举
/**
* 任务类型枚举
*
* @author Elex
* @date 2022-06-10 17:23
*/
@Getter
@AllArgsConstructor
public enum JobTypeEnum {
ONE_OFF_TASK(0,"一次性任务"),
PERSISTENT_TASK(1,"持久性任务");
/**
* code编码
*/
private Integer code;
/**
* 中文信息描述
*/
private String desc;
}
8. 定时任务操作接口
8.1. 定时任务操作接口类
/**
* @author Elex
* @description 针对表【quartz_job_manager(定时任务管理)】的数据库操作Service
* @createDate 2022-06-10 18:45:58
*/
public interface QuartzJobManagerService extends IService<QuartzJobManager> {
/**
* 保存任务
*
* @author Elex
* @date 2022/6/10 17:01
* @param job
* @return com.shiyue.ticket.po.QuartzJobManager
*/
QuartzJobManager saveJobManager(QuartzJobManager job);
/**
* 任务操作
*
* @author Elex
* @date 2022/6/10 17:01
* @param operation
* @param id
* @return java.lang.Boolean
*/
Boolean taskOperation(Integer operation, Integer id);
/**
* 删除任务
*
* @author Elex
* @date 2022/6/10 17:01
* @param ids
* @return java.lang.Boolean
*/
Boolean deleteById(List<Integer> ids);
/**
* 删除任务根据任务名称
*
* @author Elex
* @date 2022/6/12 11:04
* @param jobName
* @return java.lang.Boolean
*/
Boolean deleteByJobName(String jobName, String jobGroup);
/**
* 初始化任务
*
* @author Elex
* @date 2022/6/10 17:01
* @return java.lang.Boolean
*/
Boolean initJob();
}
8.2. 定时任务操作接口实现类
/**
* @author Elex
* @description 针对表【quartz_job_manager(定时任务管理)】的数据库操作Service实现
* @createDate 2022-06-10 18:45:58
*/
@Service
public class QuartzJobManagerServiceImpl extends ServiceImpl<QuartzJobManagerMapper, QuartzJobManager>
implements QuartzJobManagerService {
@Autowired
private JobManagerUtil jobManagerUtil;
private Map<String, BiConsumer<String, String>> taskOperationMap = new HashMap<>();
@PostConstruct
void init() {
taskOperationMap.put(JobTaskEnum.RUN.getDesc(), (jobName, jobGroup) -> jobManagerUtil.runJobNow(jobName, jobGroup));
taskOperationMap.put(JobTaskEnum.PAUSE.getDesc(), (jobName, jobGroup) -> jobManagerUtil.pauseJob(jobName, jobGroup));
taskOperationMap.put(JobTaskEnum.RECOVER.getDesc(), (jobName, jobGroup) -> jobManagerUtil.resumeJob(jobName, jobGroup));
}
@SneakyThrows
@Override
public QuartzJobManager saveJobManager(QuartzJobManager jobManager) {
long count = count(Wrappers.<QuartzJobManager>lambdaQuery()
.eq(QuartzJobManager::getJobName, jobManager.getJobName()).eq(QuartzJobManager::getDeleted, BigDecimal.ZERO.toString()));
if (count > 0) {
throw new RuntimeException("定时任务名称已存在");
}
jobManager.setDeleted(0);
jobManager.setJobStatus(JobStatusEnum.NORMAL.getCode());
save(jobManager);
if (jobManager.getJobType().equals(JobTypeEnum.ONE_OFF_TASK.getCode())) {
jobManagerUtil.addSingleCommonJob(jobManager.getJobName(), jobManager.getJobGroup(), jobManager.getStartTime(), (Class<? extends Job>) Class.forName(jobManager.getBeanClass()));
} else {
jobManagerUtil.addCommonJob(jobManager.getJobName(), jobManager.getJobGroup(), jobManager.getCronExpression(), (Class<? extends Job>) Class.forName(jobManager.getBeanClass()));
}
return jobManager;
}
@Override
public Boolean taskOperation(Integer operation, Integer id) {
QuartzJobManager byId = getById(id);
taskOperationVerify(operation, byId);
String reversal = JobTaskEnum.reversal(operation);
BiConsumer<String, String> stringStringBiConsumer = taskOperationMap.get(reversal);
stringStringBiConsumer.accept(byId.getJobName(), byId.getJobGroup());
byId.setJobStatus(operationToStatus(reversal));
save(byId);
return true;
}
@Override
public Boolean deleteById(List<Integer> ids) {
Assert.notEmpty(ids, "ids不能为空");
ids.parallelStream().forEach(i -> {
QuartzJobManager byId = getById(i);
if (Objects.nonNull(byId)) {
byId.setDeleted(1);
saveOrUpdate(byId);
jobManagerUtil.deleteJob(byId.getJobName(), byId.getJobGroup());
}
});
return true;
}
@Override
public Boolean deleteByJobName(String jobName, String jobGroup) {
if (StrUtil.isEmpty(jobName) || StrUtil.isEmpty(jobGroup)) {
throw new RuntimeException("任务分组不能为空");
}
QuartzJobManager one = getOne(Wrappers.<QuartzJobManager>lambdaQuery()
.eq(QuartzJobManager::getJobName, jobName)
.eq(QuartzJobManager::getJobGroup, jobGroup)
.eq(QuartzJobManager::getDeleted, BigDecimal.ZERO.toString()));
if (Objects.nonNull(one)) {
one.setDeleted(1);
saveOrUpdate(one);
jobManagerUtil.deleteJob(one.getJobName(), one.getJobGroup());
}
return true;
}
@Override
public Boolean initJob() {
List<QuartzJobManager> byDeletedAndStartTimeAfter = list(Wrappers.<QuartzJobManager>lambdaQuery()
.eq(QuartzJobManager::getDeleted, BigDecimal.ZERO.toString())
.or(wrapper -> wrapper.
eq(QuartzJobManager::getJobType, BigDecimal.ZERO.toString()).
gt(QuartzJobManager::getStartTime, DateUtil.now())
.eq(QuartzJobManager::getDeleted, BigDecimal.ZERO.toString())));
byDeletedAndStartTimeAfter.parallelStream().forEach(i -> {
try {
if (i.getJobType().equals(JobTypeEnum.ONE_OFF_TASK.getCode())) {
jobManagerUtil.addSingleCommonJob(i.getJobName(), i.getJobGroup(), i.getStartTime(), (Class<? extends Job>) Class.forName(i.getBeanClass()));
} else {
if (i.getJobStatus().equals(JobStatusEnum.NORMAL.getCode())) {
jobManagerUtil.addCommonJob(i.getJobName(), i.getJobGroup(), i.getCronExpression(), (Class<? extends Job>) Class.forName(i.getBeanClass()));
}
}
} catch (ClassNotFoundException e) {
log.error("获取BeanClass错误:", e);
}
});
return true;
}
private Integer operationToStatus(String jobTaskEnum) {
if (StrUtil.equals(jobTaskEnum,JobTaskEnum.PAUSE.getDesc())) {
return 1;
} else {
return 0;
}
}
private void taskOperationVerify(Integer operation, QuartzJobManager jobManager) {
if (jobManager.getJobStatus().equals(JobStatusEnum.NORMAL.getCode())) {
if (operation.equals(2)) {
throw new RuntimeException("任务正常状态下,无法使用恢复");
}
} else {
if (operation.equals(1)) {
throw new RuntimeException("任务暂停状态下,无法使用暂停");
}
}
}
}
9. 启动初始化加载任务
/**
* 初始化加载内部的job
* @author Elex
*/
@Component
public class JobApplicationRunner implements ApplicationRunner {
@Autowired
private JobManagerSreviceb jobManagerSrevice;
@Override
public void run(ApplicationArguments var1) throws Exception {
jobManagerSrevice.ininJob();
}
}