Spring 批量避免在 tasklet 之前启动 Reader 和 Writer
Spring Batch avoid launch Reader and Writer before tasklet
我正在使用 spring 批处理,有两个步骤的工作,第一步(tasklet)验证 header CSV,第二步读取 CSV 文件并写入另一个 CSV像这样的文件:
@Bean
public ClassifierCompositeItemWriter<POJO> classifierCompositeItemWriter() throws Exception {
Classifier<POJO, ItemWriter<? super POJO>> classifier = new ClassiItemWriter(ClassiItemWriter.itemWriter());
return new ClassifierCompositeItemWriterBuilder<POJO>()
.classifier(classifier)
.build();
}
@Bean
public Step readAndWriteCsvFile() throws Exception {
return stepBuilderFactory.get("readAndWriteCsvFile")
.<POJO, POJO>chunk(10000)
.reader(ClassitemReader.itemReader())
.processor(processor())
.writer(classifierCompositeItemWriter())
.build();
}
在读取 CSV 之前,我使用了 FlatFileItemReader(在 ClassitemReader 中)和 FlatFileItemWriter(在 ClassiItemWriter 中)。我通过这样的 tasklet 检查了 CSV 文件的 header 是否正确:
@Bean
public Step fileValidatorStep() {
return stepBuilderFactory
.get("fileValidatorStep")
.tasklet(fileValidator)
.build();
}
如果是这样,我处理从收到的 CSV 文件到另一个 CSV 文件的转换。
在 jobBuilderFactory
我检查 ExistStatus 是否来自 tasklet fileValidatorStep
是“已完成”以将进程转发到 readAndWriteCsvFile()
,如果不是“已完成”和 tasklet fileValidatorStep
return ExistStatus "ERROR" 作业结束并退出处理。
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(fileValidatorStep()).on("ERROR").end()
.next(fileValidatorStep()).on("COMPLETED").to(readAndWriteCsvFile())
.end().build();
}
问题是,当我启动我的作业时,Bean readAndWriteCsvFile
首先运行 tasklet,这意味着 reader 的标准 Bean 和 spring 批处理的 writer 始终加载在我可以验证 header 并检查 ExistStatus 之前的生命周期,reader 仍在工作并读取文件并将数据放入另一个文件而不检查,因为在所有 tasklet 之前加载 Bean。
如何在 fileValidatorStep
之后启动 readAndWriteCsvFile
方法?
你不需要一个流程工作,一个简单的工作就足够了。这是一个简单的例子:
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public Step validationStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("validationStep")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if(!isValid()) {
throw new Exception("Invalid file");
}
return RepeatStatus.FINISHED;
}
private boolean isValid() {
// TODO implement validation logic
return false;
}
})
.build();
}
@Bean
public Step readAndWriteCsvFile(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("readAndWriteCsvFile")
.<Integer, Integer>chunk(2)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
.writer(items -> items.forEach(System.out::println))
.build();
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(validationStep(stepBuilderFactory))
.next(readAndWriteCsvFile(stepBuilderFactory))
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
在这个例子中,如果validationStep
失败,则不会执行下一步。
我解决了我的问题,我用注释 @StepScope
更改了 Job class 配置中的 bean FlatFileItemReader
方法,现在这个 bean 只在我需要时加载它也应该避免在作业范围之外声明 FlatFileItemReader bean
我正在使用 spring 批处理,有两个步骤的工作,第一步(tasklet)验证 header CSV,第二步读取 CSV 文件并写入另一个 CSV像这样的文件:
@Bean
public ClassifierCompositeItemWriter<POJO> classifierCompositeItemWriter() throws Exception {
Classifier<POJO, ItemWriter<? super POJO>> classifier = new ClassiItemWriter(ClassiItemWriter.itemWriter());
return new ClassifierCompositeItemWriterBuilder<POJO>()
.classifier(classifier)
.build();
}
@Bean
public Step readAndWriteCsvFile() throws Exception {
return stepBuilderFactory.get("readAndWriteCsvFile")
.<POJO, POJO>chunk(10000)
.reader(ClassitemReader.itemReader())
.processor(processor())
.writer(classifierCompositeItemWriter())
.build();
}
在读取 CSV 之前,我使用了 FlatFileItemReader(在 ClassitemReader 中)和 FlatFileItemWriter(在 ClassiItemWriter 中)。我通过这样的 tasklet 检查了 CSV 文件的 header 是否正确:
@Bean
public Step fileValidatorStep() {
return stepBuilderFactory
.get("fileValidatorStep")
.tasklet(fileValidator)
.build();
}
如果是这样,我处理从收到的 CSV 文件到另一个 CSV 文件的转换。
在 jobBuilderFactory
我检查 ExistStatus 是否来自 tasklet fileValidatorStep
是“已完成”以将进程转发到 readAndWriteCsvFile()
,如果不是“已完成”和 tasklet fileValidatorStep
return ExistStatus "ERROR" 作业结束并退出处理。
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(fileValidatorStep()).on("ERROR").end()
.next(fileValidatorStep()).on("COMPLETED").to(readAndWriteCsvFile())
.end().build();
}
问题是,当我启动我的作业时,Bean readAndWriteCsvFile
首先运行 tasklet,这意味着 reader 的标准 Bean 和 spring 批处理的 writer 始终加载在我可以验证 header 并检查 ExistStatus 之前的生命周期,reader 仍在工作并读取文件并将数据放入另一个文件而不检查,因为在所有 tasklet 之前加载 Bean。
如何在 fileValidatorStep
之后启动 readAndWriteCsvFile
方法?
你不需要一个流程工作,一个简单的工作就足够了。这是一个简单的例子:
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public Step validationStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("validationStep")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if(!isValid()) {
throw new Exception("Invalid file");
}
return RepeatStatus.FINISHED;
}
private boolean isValid() {
// TODO implement validation logic
return false;
}
})
.build();
}
@Bean
public Step readAndWriteCsvFile(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("readAndWriteCsvFile")
.<Integer, Integer>chunk(2)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
.writer(items -> items.forEach(System.out::println))
.build();
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(validationStep(stepBuilderFactory))
.next(readAndWriteCsvFile(stepBuilderFactory))
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
在这个例子中,如果validationStep
失败,则不会执行下一步。
我解决了我的问题,我用注释 @StepScope
更改了 Job class 配置中的 bean FlatFileItemReader
方法,现在这个 bean 只在我需要时加载它也应该避免在作业范围之外声明 FlatFileItemReader bean