如何在运行时创建和启动 spring 批处理作业
How to create and launch spring batch jobs at runtime
我们需要执行从 1 个数据库到其他数据库的数据移动,并探索 spring 相同的批处理。我们应用程序的用户选择源和目标数据源以及需要移动数据的 table 列表。
在以下方面需要帮助:
- 构建作业所需的信息在 运行 时来自我们的 Web 应用程序 - 其中包括数据源详细信息和 table 名称列表。我们想通过将这些详细信息发送到作业构建器模块并使用 JobLauncher 启动它来创建一个新作业。我们如何编写这个工作生成器模块?
- 我们可能有多个用户同时提出数据移动请求,因此需要一种方法来创建多个作业并运行它们按table顺序排列。
我们使用了基于 Java 的配置来创建作业并从 Web 容器启动它。配置如下
@Bean
public Job loadDataJob(JobCompletionNotificationListener listener) {
RunIdIncrementer inc = new RunIdIncrementer();
inc.setKey(new Date().toString());
JobBuilder builder = jobBuilderFactory.get("loadDataJob")
.incrementer(inc)
.listener(listener);
SimpleJobBuilder simpleBuilder = builder.start(preExecute());
for(String s : getTables()){
simpleBuilder.next(etlTable(s));
}
simpleBuilder.next(postExecute());
return simpleBuilder.build();
}
@Bean
@Scope("prototype")
public Step etlTable(String tableName) {
return stepBuilderFactory.get(tableName)
.<Map<String,Object>, Map<String,Object>> chunk(1000)
.reader(dbDataReader(tableName))
.processor(processor())
.writer(dbDataWriter(tableName))
.build();
}
目前我们已经将源和目标数据源的详细信息硬编码到各自的 bean 中。 getTables() returns 需要移动数据的 table 列表(硬编码)。
启动作业的 RestController
@RestController
public class MyController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@RequestMapping("/launchjob")
public String handle() throws Exception {
try {
JobParameters jobParameters = new JobParametersBuilder().addLong("time", new Date().getTime()).toJobParameters();
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
}
return "Done";
}
}
关于你的第一个问题,你肯定要使用JavaConfiguration。此外,如果您想创建一个具有动态步骤数的作业(例如,您必须复制每个 table 的步骤),则不应将步骤定义为 spring beans。
我已经写了几个关于如何动态创建工作的问题的答案。看看他们,他们可能会有帮助
已编辑
关于第二个问题的一些说明:
首先,您使用的是普通的 JobLauncher,我假设您实例化了 SimpleJobLauncher。这意味着,您可以提供带有作业参数的作业,如您在上面的代码中所示。但是,提供的 "job" 不必是 "SpringBean" 实例,因此您不必自动装配它,因此,您可以按照我在问题答案中建议的那样使用创建方法上面提到了。
其次,如果您为每个请求动态创建作业实例,则无需将整个配置作为作业参数传递,因为您可以将 "configuration properties" 之类的数据源和 table 传递给直接作为参数复制到您的 "createJob" 方法。如果您事先不知道所有可能的数据源,您甚至可以创建数据源实例 "on the fly"。
第三,我会将每个请求都视为"single run",而不能是"restarted"。因此,我只是将一些 "meta information" 放入作业参数中,例如用户、date/time、数据源名称(url)和要复制的 table 列表。我会将这种信息用作发出请求的一种 logging/auditing,但我不会将作业参数实例用作作业本身内部的控制参数(同样,您可以在期间传递这些参数的值作业的构造时间和步骤,将它们传递给您的创建方法,因此作业的结构是根据您的参数创建的,因此,在运行时期间——当您可以访问作业参数时——无需根据工作参数)。
最后,如果请求失败(意味着作业因错误退出),只需执行一个新请求即可重试,但此请求将是一个完整的新请求,而不是重新启动已执行的请求作业启动(因为我会将请求时间添加到我的作业参数中,所以每次启动都是唯一的启动)。
已编辑 2:
不将 Job 创建为 Bean 并不意味着不使用自动装配。这是一个示例,因为我将构造我的 Beans。
@Component
@EnableBatchProcessing
@Import() // list with imports as neede
public class JobCreatorComponent {
@Autowire
private StepBuilderFactory stepBuilder;
@Autowire
private JobBuilderFactory jobBuilder;
public Job createJob(all the parameters you need) {
return jobBuilder.get(). ....
}
}
@RestController
@Import(JobCreatorComponent.class)
public class MyController {
@Autowired
JobLauncher jobLauncher;
@Autowired
JobCreatorComponent jobCreator;
@RequestMapping("/launchjob")
public String handle() throws Exception {
try {
Job job = jobCreator.createJob(... params ...);
JobParameters jobParameters = new JobParametersBuilder().addLong("time", new Date().getTime()).toJobParameters();
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
}
return "Done";
}
}
通过在项目上使用@JobScopereader 无需在 运行 时手动执行操作,只需在每次与控制器交互时使用 @Jobscope 注释您各自的 reader将获得新的记录处理。
这是一种按需作业类型,您可以在其中执行作业以实现目标,例如执行数据库迁移或获取特定报告。
我们需要执行从 1 个数据库到其他数据库的数据移动,并探索 spring 相同的批处理。我们应用程序的用户选择源和目标数据源以及需要移动数据的 table 列表。
在以下方面需要帮助:
- 构建作业所需的信息在 运行 时来自我们的 Web 应用程序 - 其中包括数据源详细信息和 table 名称列表。我们想通过将这些详细信息发送到作业构建器模块并使用 JobLauncher 启动它来创建一个新作业。我们如何编写这个工作生成器模块?
- 我们可能有多个用户同时提出数据移动请求,因此需要一种方法来创建多个作业并运行它们按table顺序排列。
我们使用了基于 Java 的配置来创建作业并从 Web 容器启动它。配置如下
@Bean
public Job loadDataJob(JobCompletionNotificationListener listener) {
RunIdIncrementer inc = new RunIdIncrementer();
inc.setKey(new Date().toString());
JobBuilder builder = jobBuilderFactory.get("loadDataJob")
.incrementer(inc)
.listener(listener);
SimpleJobBuilder simpleBuilder = builder.start(preExecute());
for(String s : getTables()){
simpleBuilder.next(etlTable(s));
}
simpleBuilder.next(postExecute());
return simpleBuilder.build();
}
@Bean
@Scope("prototype")
public Step etlTable(String tableName) {
return stepBuilderFactory.get(tableName)
.<Map<String,Object>, Map<String,Object>> chunk(1000)
.reader(dbDataReader(tableName))
.processor(processor())
.writer(dbDataWriter(tableName))
.build();
}
目前我们已经将源和目标数据源的详细信息硬编码到各自的 bean 中。 getTables() returns 需要移动数据的 table 列表(硬编码)。
启动作业的 RestController
@RestController
public class MyController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@RequestMapping("/launchjob")
public String handle() throws Exception {
try {
JobParameters jobParameters = new JobParametersBuilder().addLong("time", new Date().getTime()).toJobParameters();
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
}
return "Done";
}
}
关于你的第一个问题,你肯定要使用JavaConfiguration。此外,如果您想创建一个具有动态步骤数的作业(例如,您必须复制每个 table 的步骤),则不应将步骤定义为 spring beans。
我已经写了几个关于如何动态创建工作的问题的答案。看看他们,他们可能会有帮助
已编辑
关于第二个问题的一些说明:
首先,您使用的是普通的 JobLauncher,我假设您实例化了 SimpleJobLauncher。这意味着,您可以提供带有作业参数的作业,如您在上面的代码中所示。但是,提供的 "job" 不必是 "SpringBean" 实例,因此您不必自动装配它,因此,您可以按照我在问题答案中建议的那样使用创建方法上面提到了。
其次,如果您为每个请求动态创建作业实例,则无需将整个配置作为作业参数传递,因为您可以将 "configuration properties" 之类的数据源和 table 传递给直接作为参数复制到您的 "createJob" 方法。如果您事先不知道所有可能的数据源,您甚至可以创建数据源实例 "on the fly"。
第三,我会将每个请求都视为"single run",而不能是"restarted"。因此,我只是将一些 "meta information" 放入作业参数中,例如用户、date/time、数据源名称(url)和要复制的 table 列表。我会将这种信息用作发出请求的一种 logging/auditing,但我不会将作业参数实例用作作业本身内部的控制参数(同样,您可以在期间传递这些参数的值作业的构造时间和步骤,将它们传递给您的创建方法,因此作业的结构是根据您的参数创建的,因此,在运行时期间——当您可以访问作业参数时——无需根据工作参数)。
最后,如果请求失败(意味着作业因错误退出),只需执行一个新请求即可重试,但此请求将是一个完整的新请求,而不是重新启动已执行的请求作业启动(因为我会将请求时间添加到我的作业参数中,所以每次启动都是唯一的启动)。
已编辑 2: 不将 Job 创建为 Bean 并不意味着不使用自动装配。这是一个示例,因为我将构造我的 Beans。
@Component
@EnableBatchProcessing
@Import() // list with imports as neede
public class JobCreatorComponent {
@Autowire
private StepBuilderFactory stepBuilder;
@Autowire
private JobBuilderFactory jobBuilder;
public Job createJob(all the parameters you need) {
return jobBuilder.get(). ....
}
}
@RestController
@Import(JobCreatorComponent.class)
public class MyController {
@Autowired
JobLauncher jobLauncher;
@Autowired
JobCreatorComponent jobCreator;
@RequestMapping("/launchjob")
public String handle() throws Exception {
try {
Job job = jobCreator.createJob(... params ...);
JobParameters jobParameters = new JobParametersBuilder().addLong("time", new Date().getTime()).toJobParameters();
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
}
return "Done";
}
}
通过在项目上使用@JobScopereader 无需在 运行 时手动执行操作,只需在每次与控制器交互时使用 @Jobscope 注释您各自的 reader将获得新的记录处理。
这是一种按需作业类型,您可以在其中执行作业以实现目标,例如执行数据库迁移或获取特定报告。