Spring 批处理:无法完全计算出条件流

Spring Batch: Can't quite work out Conditional Flow

编辑以更新我的最新配置:这对我的用例来说是否正确?

我的流程应该是这样的:

  1. FileRetrievingTasklet 检索远程文件并将 该文件在执行上下文中的“类型”。
  2. 如果文件类型为“YEARLY”,则继续 yearlyStep()
  3. 如果文件类型为“QUARTERLY”,则继续 quarterlyStep()
  4. 完成。

这看起来很简单,但是我有的东西不起作用。在 tasklet 步骤之后,作业以 FAILED 结束。

这是我的工作配置:

@Bean
public Job fundsDistributionJob() {
    return jobBuilderFactory
            .get("fundsDistributionJob")
            .start(retrieveFileStep(stepBuilderFactory))
            .on("YEARLY").to(yearEndStep())
            .from(retrieveFileStep(stepBuilderFactory))
            .on("QUARTERLY").to(quarterlyStep())
            .end()
            .listener(new FileWorkerJobExecutionListener())
            .build();
}

其中一个步骤:

   @Bean
    public Step quarterlyStep() {
        return stepBuilderFactory.get("quarterlyStep")
                .<Item, Item>chunk(10)
                .reader(quarterlyReader())
                .processor(processor())
                .writer(writer())
                .listener(new StepItemReadListener())
                .faultTolerant()
                .skipPolicy(new DistSkipPolicy())
                .build();
    }

有人可以告诉我缺少什么吗?

决策者的方法(在您编辑之前)是可行的方法。您的流程定义有问题。这是一个按照您描述的方式工作的示例:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    private final JobBuilderFactory jobs;
    private final StepBuilderFactory steps;

    public MyJob(JobBuilderFactory jobs, StepBuilderFactory steps) {
        this.jobs = jobs;
        this.steps = steps;
    }

    @Bean
    public Step retrieveFileStep() {
        return steps.get("retrieveFileStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("Downloading file..");
                    chunkContext.getStepContext().getStepExecution()
                            .getExecutionContext().put("type", Type.YEARLY);
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
    
    @Bean
    public JobExecutionDecider fileMapperDecider() {
        return (jobExecution, stepExecution) -> {
            Type type = (Type) stepExecution.getExecutionContext().get("type");
            return new FlowExecutionStatus(type == Type.YEARLY ? "yearly" : "quarterly");
        };
    }

    @Bean
    public Step yearlyStep() {
        return steps.get("yearlyStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("running yearlyStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step quarterlyStep() {
        return steps.get("quarterlyStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("running quarterlyStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(retrieveFileStep())
                .next(fileMapperDecider())
                .from(fileMapperDecider()).on("yearly").to(yearlyStep())
                .from(fileMapperDecider()).on("quarterly").to(quarterlyStep())
                .build()
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }
    
    enum Type {
        YEARLY, QUARTERLY
    }

}

它打印:

Downloading file..
running yearlyStep

如果将执行上下文中的 type 属性更改为 retrieveFileStep 中的 Type.QUARTERLY,它会打印:

Downloading file..
running quarterlyStep