Java Spring 引导计划作业

Java Spring Boot scheduled jobs

我有一个包含一些作业的应用程序,我想将这些作业保留在数据库中以便更好地维护。

该应用的结构如下:

数据库:

CREATE TABLE jobs_config (
    id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
    job_name varchar NOT NULL,
    scheduled_value varchar NOT NULL,
    CONSTRAINT jobs_config_pk PRIMARY KEY (id),
    CONSTRAINT jobs_config_name_un UNIQUE (job_name)
);
insert into jobs_config(job_name, scheduled_value) values('DeleteExpiredTokenJob', '0 0 10 * * MON');

JAVA SPRING 开机:

@Entity
@Table(name = "jobs_config")
public class JobsConfig {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "job_name")
    private String jobName;

    private String scheduledValue;

    public JobsConfig() {
    }

    public JobsConfig(String jobName, String scheduledValue) {
        this.jobName = jobName;
        this.scheduledValue = scheduledValue;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getScheduledValue() {
        return scheduledValue;
    }

    public void setScheduledValue(String scheduledValue) {
        this.scheduledValue = scheduledValue;
    }
}

@Service
public class JobSchedulerService implements SchedulingConfigurer {

    private static Logger logger = LoggerFactory.getLogger(JobSchedulerService.class);

    @Autowired
    JobsConfigRepository jobsConfigRepository;

    @Autowired
    DeleteExpiredTokenJob deleteExpiredTokenJob;

    @Autowired
    TestJob testJob;

    @Bean
    public TaskScheduler poolScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
        scheduler.setPoolSize(1);
        scheduler.initialize();
        return scheduler;
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setTaskScheduler(poolScheduler());
        listJobList();
//        refreshJobList( poolScheduler());
    }

    public void listJobList() {
        List<JobsConfig>  jobsList = jobsConfigRepository.findAll();
        for (JobsConfig jobName : jobsList) {
            switch (jobName.getJobName()) {
                case "DeleteExpiredTokenJob":
                    scheduleJob(poolScheduler(), deleteExpiredTokenJob, jobName.getJobName());
                    break;
                case "TestJob":
                    scheduleJob(poolScheduler(), testJob, jobName.getJobName());
                    break;
                default:
                    logger.info(String.format("JOB NOT FOUND [%s]", jobName.getJobName()));
            }
        }
    }

    public void scheduleJob(TaskScheduler scheduler, JobInterface jobInterface, String jobName){
        scheduler.schedule(new Runnable(){
            @Override
            public void run() {
                jobInterface.jobCode();
            }
        }, new Trigger(){
            @Override
            public Date nextExecutionTime(TriggerContext triggerContext) {
                Optional <JobsConfig> job = jobsConfigRepository.findByJobName(jobName);
                String cronExp = job.get().getScheduledValue();
                return new CronTrigger(cronExp).nextExecutionTime(triggerContext);
            }
        });
    }
    //THIS IS WHAT I'VE TRIED !
    /*private void refreshJobList(TaskScheduler scheduler){
        scheduler.schedule(new Runnable(){
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+" The Task2 executed at "+ new Date());
                listJobList();
            }
        }, new Trigger(){
            @Override
            public Date nextExecutionTime(TriggerContext triggerContext) {
                String cronExp="0/10 * * * * ?";//Can be pulled from a db . This will run every minute
                return new CronTrigger(cronExp).nextExecutionTime(triggerContext);
            }
        });
    }*/

}

我的想法是,每次我在 jobs_config table 中添加一条记录时,我都需要将其反映在 java.

因此,如果我在 table 中添加新作业,我希望无需重新启动应用程序即可使用(当然存在 DeleteExpiredTokenJob 的 java 代码)。

insert into jobs_config(job_name, scheduled_value) values('DeleteExpiredTokenJob', '0/5 * * * * ?');

基本上我需要用 listJobList() 方法刷新列表 List<JobsConfig> jobsList = jobsConfigRepository.findAll();

我怎样才能做到这一点?

我看到你已经尝试创建一个可以刷新工作的工作,但没有成功。

我看不到在数据库中添加 new 作业后可用的简单方法,假设它有一个新的 @Autowired 作业要注入,但我想我知道如何跟踪数据库中的 updated 作业,即 scheduled_value 列。

  1. 在此处保留旧的 JobsConfig 值。

  2. 检查是否有任何更改。

    2.a。如果更改,则取消下一个作业执行,并安排更新一个。

  3. 睡几分钟准备下一次检查。 (就像您在注释代码中所做的那样)

    Map<String, JobsConfig> oldJobsConfigs = new HashMap<>();
    Map<String, ScheduledFuture<?>> activeJobs = new HashMap<>()

    public void listJobList() {
        List<JobsConfig> jobsList = jobsConfigRepository.findAll();
        for (JobsConfig jobName : jobsList) {
            // If this job was there before and has not changed, do nothing.
            if (oldJobsConfigs.containsKey(jobName.getJobName()) && oldJobsConfigs.get(jobName.getJobName()).getScheduledValue().equals(jobName.getScheduledValue())) 
                break;

            // Cancel previous execution, if any.
            if (activeJobs.containsKey(jobName.getJobName()) {
                ScheduledFuture<?> job = activeJobs.get(jobName.getJobName());
                job.cancel(false);
                try {
                    job.get(); // Warning! If the job is running, blocks current thread until the job finishes. If has an endless loop, it will block current thread forever.
                } catch (CancellationException e) {
                    // Do nothing, this is good, we did not spent time waiting for the job to finish.
                } catch (InterruptedException | ExecutionException e) {
                    // Log it?
                }
            }

            ScheduledFuture<?> newJob = null;
            switch (jobName.getJobName()) {
                case "DeleteExpiredTokenJob":
                    newJob = scheduleJob(poolScheduler(), deleteExpiredTokenJob, jobName.getJobName());
                    break;
                case "TestJob":
                    newJob = scheduleJob(poolScheduler(), testJob, jobName.getJobName());
                    break;
                default:
                    logger.info(String.format("JOB NOT FOUND [%s]", jobName.getJobName()));
            }

            if (newJob != null)
                activeJobs.put(jobName.getJobName(), newJob);
        }
    }

并更改 scheduleJob 签名,以便它将使用 scheduler returns.

    public ScheduledFuture<?> scheduleJob(TaskScheduler scheduler, JobInterface jobInterface, String jobName) {
        return scheduler.schedule(new Runnable() {
// ... unchanged

希望对您有所帮助。 :D

更新: 如果某天您的工作数量超过十个,您可以稍微调整 JobInterface 使其 return 它的工作名称:

    public interface JobInterface {
        // ... old methods
        String getJobName(); // Consider switching to enums? 
    }

并让 Spring 将 JobInterface 的所有实现自动装配到 JobSchedulerService:

@Service
public class JobSchedulerService implements SchedulingConfigurer {
    @Autowired
    JobsConfigRepository jobsConfigRepository;

    @Autowired
    List<JobInterface> allJobs;

然后,您将编写

而不是包含大量条目的开关
    ScheduledFuture<?> newJob = null;
    for(JobInterface job : allJobs)
        if (job.getJobName().equals(jobName.getJobName()))
            newJob = scheduleJob(/*arguments*/);

    if (newJob == null)
        logger.warn(/*swear loudly :)*/);

也就是说,仅当您要删除此 switch 时。如果你计划的工作数量很少,那么保持原样也是可以的,可能吧。