音乐播放器
Elex'Blog
 
文章 标签
10

Powered by Elex | Theme: Fog
载入天数...
载入时分秒...

SpringBoot集成Quartz自定义表管理定时器

前言

本文章中出现的第三方框架方法:MyBatis-Plus,Hutools,Quartz。

1. 建表

CREATE TABLE `quartz_job_manager` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `job_name` varchar(255) NOT NULL COMMENT '任务名称',
  `job_group` varchar(50) DEFAULT NULL COMMENT '任务分组',
  `start_time` datetime DEFAULT NULL COMMENT '一次行执行时间',
  `cron_expression` varchar(30) DEFAULT NULL COMMENT 'cron表达式',
  `bean_class` varchar(255) DEFAULT NULL COMMENT '执行方法',
  `job_status` int(11) DEFAULT NULL COMMENT '任务状态;0.执行 1.暂停',
  `job_type` int(11) DEFAULT NULL COMMENT '任务类型;0.一次性任务 1.长期任务',
  `deleted` int(11) DEFAULT NULL COMMENT '0正常,1已删除',
  `creator_id` int(11) DEFAULT NULL COMMENT '创建用户Id',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `modifier_id` int(11) DEFAULT NULL COMMENT '修改用户id',
  `modify_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`id`,`job_name`)
) ENGINE=InnoDB AUTO_INCREMENT=3120 DEFAULT CHARSET=utf8 COMMENT='定时任务管理';

2. 导包

<dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
</dependency>

3. 添加配置文件

在resources下添加quartz.properties配置文件

# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#org.quartz.scheduler.instanceName: DefaultQuartzScheduler
#org.quartz.scheduler.rmi.export: false
#org.quartz.scheduler.rmi.proxy: false
#org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount= 20
#org.quartz.threadPool.threadPriority: 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#org.quartz.jobStore.misfireThreshold: 60000
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

4. 添加定时器工厂类

@Component
public class JobFactory extends AdaptableJobFactory {

    /**
     * Job对象的实例化过程是在Quartz中进行的,这时候我们需要将spring的东西注入进来
     */
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        //调用父类的方法
        Object jobInstance = super.createJobInstance(bundle);
        //进行注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

5. 添加定时器配置

@Configuration
public class JobConfigration {

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory) {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        try {
            schedulerFactoryBean.setOverwriteExistingJobs(true);
            schedulerFactoryBean.setQuartzProperties(quartzProperties());
            schedulerFactoryBean.setJobFactory(jobFactory);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return schedulerFactoryBean;
    }

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    /**
     * 创建schedule
     * @return Scheduler
     */
    @Bean(name = "scheduler")
    public Scheduler scheduler(JobFactory jobFactory) {
        return schedulerFactoryBean(jobFactory).getScheduler();
    }
}

6. 定时任务操作类

@Component
@Slf4j
public class JobManagerUtil {
    @Resource
    private Scheduler scheduler;

    /**
     * 添加任务2
     *
     * @param jobName        jobName
     * @param jobGroup       jobGroup
     * @param cronExpression cronExpression
     * @param beanClass      beanClass
     */
    public void addCommonJob(String jobName, String jobGroup, String cronExpression, Class<? extends Job> beanClass) {
        try {
            // 创建jobDetail实例,绑定Job实现类 ,指明job的名称,所在组的名称,以及绑定job类
            JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
            // 定义调度触发规则 使用 cornTrigger 规则
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
                    .startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
                    .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow().build();
            // 把作业和触发器注册到任务调度中
            scheduler.scheduleJob(jobDetail, trigger);
            // 启动
            if (!scheduler.isStarted()) {
                scheduler.start();
            }
        } catch (Exception e) {
            log.error("add Job error", e);
        }
    }

    /**
     * 添加任务3 一次性任务
     *
     * @param jobName   jobName
     * @param jobGroup  jobGroup
     * @param startTime startTime
     * @param beanClass beanClass
     */
    public void addSingleCommonJob(String jobName, String jobGroup, Date startTime, Class<? extends Job> beanClass) {
        try {
            JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
            SimpleTrigger singleTrigger = TriggerBuilder.newTrigger()
                    .withIdentity(jobName, jobGroup).startAt(startTime).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1))
                    .build();
            // 把作业和触发器注册到任务调度中
            scheduler.scheduleJob(jobDetail, singleTrigger);
            // 启动
            if (!scheduler.isStarted()) {
                scheduler.start();
            }
        } catch (Exception e) {
            log.error("add Job error", e);
        }
    }

    /**
     * 添加一个停止任务
     * Params:
     * jobName – jobName
     * jobGroup – jobGroup
     * startTime – startTime
     * beanClass – beanClass
     */
    public void addCommonJobShutdown(String jobName, String jobGroup, String cronExpression, Class<? extends Job> beanClass) {
        try {
            // 创建jobDetail实例,绑定Job实现类 ,指明job的名称,所在组的名称,以及绑定job类
            JobDetail jobDetail = JobBuilder.newJob(beanClass).withIdentity(jobName, jobGroup).build();
            // 定义调度触发规则 使用 cornTrigger 规则
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
                    .startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
                    .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow().build();
            // 把作业和触发器注册到任务调度中
            scheduler.scheduleJob(jobDetail, trigger);
            // 停止
            if (scheduler.isStarted()) {
                scheduler.shutdown();
            }
        } catch (Exception e) {
            log.error("add Job error", e);
        }
    }

    /**
     * 暂停任务
     */
    public void pauseJob(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            scheduler.pauseJob(jobKey);
        } catch (SchedulerException e) {
            log.error("pauseJob error", e);
        }
    }

    /**
     * 恢复任务
     */
    public void resumeJob(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            scheduler.resumeJob(jobKey);
        } catch (SchedulerException e) {
            log.error("resumeJob error", e);
        }
    }

    /**
     * 删除任务
     */
    public void deleteJob(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            if (scheduler.checkExists(jobKey)) {
                scheduler.deleteJob(jobKey);
                log.debug("定时任务:{}.{}-已删除!", jobName, jobGroup);
            }
        } catch (SchedulerException e) {
            log.error("delete job error", e);
        }
    }

    /**
     * 启动任务(将现有任务再执行一次<仅执行一次并非新建了一个任务>
     */
    public void runJobNow(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            scheduler.triggerJob(jobKey);
        } catch (SchedulerException e) {
            log.error("run Job now error", e);
        }
    }

    /**
     * 查看任务是否存在
     */
    public Boolean checkExists(String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            return scheduler.checkExists(jobKey);
        } catch (SchedulerException e) {
            log.error("pauseJob error", e);
            return false;
        }
    }

}

7. 定时任务实体

/**
 * 定时任务管理
 * @TableName quartz_job_manager
 */
@TableName(value ="quartz_job_manager")
@Data
public class QuartzJobManager implements Serializable {
    /**
     * 
     */
    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    /**
     * 任务名称
     */
    @TableField(value = "job_name")
    private String jobName;

    /**
     * 任务分组
     */
    @TableField(value = "job_group")
    private String jobGroup;

    /**
     * 一次行执行时间
     */
    @TableField(value = "start_time")
    private Date startTime;

    /**
     * cron表达式
     */
    @TableField(value = "cron_expression")
    private String cronExpression;

    /**
     * 执行方法
     */
    @TableField(value = "bean_class")
    private String beanClass;

    /**
     * 任务状态;0.执行 1.暂停
     */
    @TableField(value = "job_status")
    private Integer jobStatus;

    /**
     * 任务类型;0.一次性任务 1.长期任务
     */
    @TableField(value = "job_type")
    private Integer jobType;

    /**
     * 0正常,1已删除
     */
    @TableField(value = "deleted")
    private Integer deleted;

    /**
     * 创建用户Id
     */
    @TableField(value = "creator_id")
    private Integer creatorId;

    /**
     * 创建时间
     */
    @TableField(value = "create_time")
    private Date createTime;

    /**
     * 修改用户id
     */
    @TableField(value = "modifier_id")
    private Integer modifierId;

    /**
     * 修改时间
     */
    @TableField(value = "modify_time")
    private Date modifyTime;

    @TableField(exist = false)
    private static final long serialVersionUID = 1L;
}

7.1. 任务操作类型枚举

7.1.1. 任务状态枚举

/**
 * 任务状态枚举
 * @author Elex
 * @date 2022-06-10 17:16
 */
@Getter
@AllArgsConstructor
public enum JobStatusEnum {

    NORMAL(0,"正常"),

    PAUSE(1,"暂停");


    /**
     * code编码
     */
    private Integer code;
    /**
     * 中文信息描述
     */
    private String desc;

    public static String reversal(Integer code) {
        return EnumUtil.getBy(JobStatusEnum.class, key -> key.getCode().equals(code)).getDesc();
    }

}

7.1.2. 任务状态枚举

/** 
 * 任务状态枚举
 * 
 * @author Elex
 * @date 2022-06-10 16:57
 */
@Getter
@AllArgsConstructor
public enum JobTaskEnum {
    RUN(0, "运行"),
    PAUSE(1, "暂停"),
    RECOVER(2, "恢复");

    /**
     * code编码
     */
    private Integer code;
    /**
     * 中文信息描述
     */
    private String desc;



    public static String reversal(Integer code) {
        return EnumUtil.getBy(JobTaskEnum.class, key -> key.getCode().equals(code)).getDesc();
    }

}

7.1.3. 任务类型枚举

/**
 * 任务类型枚举
 * 
 * @author Elex
 * @date 2022-06-10 17:23
 */
@Getter
@AllArgsConstructor
public enum JobTypeEnum {
    ONE_OFF_TASK(0,"一次性任务"),

    PERSISTENT_TASK(1,"持久性任务");


    /**
     * code编码
     */
    private Integer code;
    /**
     * 中文信息描述
     */
    private String desc;


}

8. 定时任务操作接口

8.1. 定时任务操作接口类

/**
* @author Elex
* @description 针对表【quartz_job_manager(定时任务管理)】的数据库操作Service
* @createDate 2022-06-10 18:45:58
*/
public interface QuartzJobManagerService extends IService<QuartzJobManager> {

    /**
     * 保存任务
     *
     * @author Elex
     * @date 2022/6/10 17:01
     * @param job
     * @return com.shiyue.ticket.po.QuartzJobManager
     */
    QuartzJobManager saveJobManager(QuartzJobManager job);

    /**
     * 任务操作
     *
     * @author Elex
     * @date 2022/6/10 17:01
     * @param operation
     * @param id
     * @return java.lang.Boolean
     */
    Boolean taskOperation(Integer operation, Integer id);

    /**
     * 删除任务
     *
     * @author Elex
     * @date 2022/6/10 17:01
     * @param ids
     * @return java.lang.Boolean
     */
    Boolean deleteById(List<Integer> ids);

    /**
     * 删除任务根据任务名称
     *
     * @author Elex
     * @date 2022/6/12 11:04
     * @param jobName
     * @return java.lang.Boolean
     */
    Boolean deleteByJobName(String jobName, String jobGroup);

    /**
     * 初始化任务
     *
     * @author Elex
     * @date 2022/6/10 17:01
     * @return java.lang.Boolean
     */
    Boolean initJob();
}

8.2. 定时任务操作接口实现类

/**
* @author Elex
* @description 针对表【quartz_job_manager(定时任务管理)】的数据库操作Service实现
* @createDate 2022-06-10 18:45:58
*/
@Service
public class QuartzJobManagerServiceImpl extends ServiceImpl<QuartzJobManagerMapper, QuartzJobManager>
    implements QuartzJobManagerService {
    @Autowired
    private JobManagerUtil jobManagerUtil;

    private Map<String, BiConsumer<String, String>> taskOperationMap = new HashMap<>();

    @PostConstruct
    void init() {
        taskOperationMap.put(JobTaskEnum.RUN.getDesc(), (jobName, jobGroup) -> jobManagerUtil.runJobNow(jobName, jobGroup));
        taskOperationMap.put(JobTaskEnum.PAUSE.getDesc(), (jobName, jobGroup) -> jobManagerUtil.pauseJob(jobName, jobGroup));
        taskOperationMap.put(JobTaskEnum.RECOVER.getDesc(), (jobName, jobGroup) -> jobManagerUtil.resumeJob(jobName, jobGroup));
    }

    @SneakyThrows
    @Override
    public QuartzJobManager saveJobManager(QuartzJobManager jobManager) {
        long count = count(Wrappers.<QuartzJobManager>lambdaQuery()
                .eq(QuartzJobManager::getJobName, jobManager.getJobName()).eq(QuartzJobManager::getDeleted, BigDecimal.ZERO.toString()));
        if (count > 0) {
            throw new RuntimeException("定时任务名称已存在");
        }
        jobManager.setDeleted(0);
        jobManager.setJobStatus(JobStatusEnum.NORMAL.getCode());
        save(jobManager);
        if (jobManager.getJobType().equals(JobTypeEnum.ONE_OFF_TASK.getCode())) {
            jobManagerUtil.addSingleCommonJob(jobManager.getJobName(), jobManager.getJobGroup(), jobManager.getStartTime(), (Class<? extends Job>) Class.forName(jobManager.getBeanClass()));
        } else {
            jobManagerUtil.addCommonJob(jobManager.getJobName(), jobManager.getJobGroup(), jobManager.getCronExpression(), (Class<? extends Job>) Class.forName(jobManager.getBeanClass()));
        }
        return jobManager;
    }

    @Override
    public Boolean taskOperation(Integer operation, Integer id) {
        QuartzJobManager byId = getById(id);
        taskOperationVerify(operation, byId);
        String reversal = JobTaskEnum.reversal(operation);
        BiConsumer<String, String> stringStringBiConsumer = taskOperationMap.get(reversal);
        stringStringBiConsumer.accept(byId.getJobName(), byId.getJobGroup());
        byId.setJobStatus(operationToStatus(reversal));
        save(byId);
        return true;
    }

    @Override
    public Boolean deleteById(List<Integer> ids) {
        Assert.notEmpty(ids, "ids不能为空");
        ids.parallelStream().forEach(i -> {
            QuartzJobManager byId = getById(i);
            if (Objects.nonNull(byId)) {
                byId.setDeleted(1);
                saveOrUpdate(byId);
                jobManagerUtil.deleteJob(byId.getJobName(), byId.getJobGroup());
            }
        });
        return true;
    }

    @Override
    public Boolean deleteByJobName(String jobName, String jobGroup) {
        if (StrUtil.isEmpty(jobName) || StrUtil.isEmpty(jobGroup)) {
            throw new RuntimeException("任务分组不能为空");
        }
        QuartzJobManager one = getOne(Wrappers.<QuartzJobManager>lambdaQuery()
                .eq(QuartzJobManager::getJobName, jobName)
                .eq(QuartzJobManager::getJobGroup, jobGroup)
                .eq(QuartzJobManager::getDeleted, BigDecimal.ZERO.toString()));
        if (Objects.nonNull(one)) {
            one.setDeleted(1);
            saveOrUpdate(one);
            jobManagerUtil.deleteJob(one.getJobName(), one.getJobGroup());
        }
        return true;
    }

    @Override
    public Boolean initJob() {
        List<QuartzJobManager> byDeletedAndStartTimeAfter = list(Wrappers.<QuartzJobManager>lambdaQuery()
                .eq(QuartzJobManager::getDeleted, BigDecimal.ZERO.toString())
                .or(wrapper -> wrapper.
                        eq(QuartzJobManager::getJobType, BigDecimal.ZERO.toString()).
                        gt(QuartzJobManager::getStartTime, DateUtil.now())
                        .eq(QuartzJobManager::getDeleted, BigDecimal.ZERO.toString())));
        byDeletedAndStartTimeAfter.parallelStream().forEach(i -> {
            try {
                if (i.getJobType().equals(JobTypeEnum.ONE_OFF_TASK.getCode())) {
                    jobManagerUtil.addSingleCommonJob(i.getJobName(), i.getJobGroup(), i.getStartTime(), (Class<? extends Job>) Class.forName(i.getBeanClass()));
                } else {
                    if (i.getJobStatus().equals(JobStatusEnum.NORMAL.getCode())) {
                        jobManagerUtil.addCommonJob(i.getJobName(), i.getJobGroup(), i.getCronExpression(), (Class<? extends Job>) Class.forName(i.getBeanClass()));
                    }
                }
            } catch (ClassNotFoundException e) {
                log.error("获取BeanClass错误:", e);
            }
        });
        return true;
    }

    private Integer operationToStatus(String jobTaskEnum) {
        if (StrUtil.equals(jobTaskEnum,JobTaskEnum.PAUSE.getDesc())) {
            return 1;
        } else {
            return 0;
        }
    }

    private void taskOperationVerify(Integer operation, QuartzJobManager jobManager) {
        if (jobManager.getJobStatus().equals(JobStatusEnum.NORMAL.getCode())) {
            if (operation.equals(2)) {
                throw new RuntimeException("任务正常状态下,无法使用恢复");
            }
        } else {
            if (operation.equals(1)) {
                throw new RuntimeException("任务暂停状态下,无法使用暂停");
            }
        }
    }
}

9. 启动初始化加载任务

/**
 * 初始化加载内部的job
 * @author Elex
 */
@Component
public class JobApplicationRunner implements ApplicationRunner {

    @Autowired
    private JobManagerSreviceb jobManagerSrevice;

    @Override
    public void run(ApplicationArguments var1) throws Exception {
        jobManagerSrevice.ininJob();
    }
}