Spring 批量避免在 tasklet 之前启动 Reader 和 Writer

Spring Batch avoid launch Reader and Writer before tasklet

我正在使用 spring 批处理,有两个步骤的工作,第一步(tasklet)验证 header CSV,第二步读取 CSV 文件并写入另一个 CSV像这样的文件:

@Bean
public ClassifierCompositeItemWriter<POJO> classifierCompositeItemWriter() throws Exception {
    Classifier<POJO, ItemWriter<? super POJO>> classifier = new ClassiItemWriter(ClassiItemWriter.itemWriter());
    return new ClassifierCompositeItemWriterBuilder<POJO>()
            .classifier(classifier)
            .build();
}



@Bean
public Step readAndWriteCsvFile() throws Exception {
    return stepBuilderFactory.get("readAndWriteCsvFile")
            .<POJO, POJO>chunk(10000)
            .reader(ClassitemReader.itemReader())
            .processor(processor())
            .writer(classifierCompositeItemWriter())
            .build();
}

在读取 CSV 之前,我使用了 FlatFileItemReader(在 ClassitemReader 中)和 FlatFileItemWriter(在 ClassiItemWriter 中)。我通过这样的 tasklet 检查了 CSV 文件的 header 是否正确:

@Bean
public Step fileValidatorStep() {
    return stepBuilderFactory
            .get("fileValidatorStep")
            .tasklet(fileValidator)
            .build();
}

如果是这样,我处理从收到的 CSV 文件到另一个 CSV 文件的转换。

jobBuilderFactory 我检查 ExistStatus 是否来自 tasklet fileValidatorStep 是“已完成”以将进程转发到 readAndWriteCsvFile(),如果不是“已完成”和 tasklet fileValidatorStep return ExistStatus "ERROR" 作业结束并退出处理。

@Bean
public Job job() throws Exception {
    return jobBuilderFactory.get("job")
            .incrementer(new RunIdIncrementer())
            .start(fileValidatorStep()).on("ERROR").end()
            .next(fileValidatorStep()).on("COMPLETED").to(readAndWriteCsvFile())
            .end().build();
}

问题是,当我启动我的作业时,Bean readAndWriteCsvFile 首先运行 tasklet,这意味着 reader 的标准 Bean 和 spring 批处理的 writer 始终加载在我可以验证 header 并检查 ExistStatus 之前的生命周期,reader 仍在工作并读取文件并将数据放入另一个文件而不检查,因为在所有 tasklet 之前加载 Bean。

如何在 fileValidatorStep 之后启动 readAndWriteCsvFile 方法?

你不需要一个流程工作,一个简单的工作就足够了。这是一个简单的例子:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {

    @Bean
    public Step validationStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("validationStep")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        if(!isValid()) {
                            throw new Exception("Invalid file");
                        }
                        return RepeatStatus.FINISHED;
                    }

                    private boolean isValid() {
                        // TODO implement validation logic
                        return false;
                    }
                })
                .build();
    }

    @Bean
    public Step readAndWriteCsvFile(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("readAndWriteCsvFile")
                .<Integer, Integer>chunk(2)
                .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
                .writer(items -> items.forEach(System.out::println))
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        return jobBuilderFactory.get("job")
                .start(validationStep(stepBuilderFactory))
                .next(readAndWriteCsvFile(stepBuilderFactory))
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

在这个例子中,如果validationStep失败,则不会执行下一步。

我解决了我的问题,我用注释 @StepScope 更改了 Job class 配置中的 bean FlatFileItemReader 方法,现在这个 bean 只在我需要时加载它也应该避免在作业范围之外声明 FlatFileItemReader bean