spring StepScope 中的批处理作业参数

spring batch job parameters in StepScope

我有一个 spring 批处理作业(v4.3.1,无 xml 配置)用于在数据库中加载 CSV 文件。 这些作业由休息控制器启动,因此不会在启动时自动启动。 当应用程序启动时知道文件名时,一切正常。

但是当我尝试将 jobParameter 传递给步骤(并使用 @StepScope 对其进行注释)时,我在应用程序启动时遇到异常(此时作业尚未启动)。 我得到的例外是

Caused by: java.lang.IllegalArgumentException: Path must not be null
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.2.jar!/:5.3.2]
    at org.springframework.core.io.FileSystemResource.<init>(FileSystemResource.java:80) ~[spring-core-5.3.2.jar!/:5.3.2]
    at TaskImportJobConfiguration.importTasksReader(TaskImportJobConfiguration.java:66) ~[classes!/:na]
    at TaskImportJobConfiguration.step1(TaskImportJobConfiguration.java:109) ~[classes!/:na]
    at TaskImportJobConfiguration$$EnhancerBySpringCGLIB$$b686b37a.CGLIB$step1(<generated>) ~[classes!/:na]
    at TaskImportJobConfiguration$$EnhancerBySpringCGLIB$$b686b37a$$FastClassBySpringCGLIB$87bf2f.invoke(<generated>) ~[classes!/:na]
    at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.3.2.jar!/:5.3.2]
    at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-5.3.2.jar!/:5.3.2]
    at TaskImportJobConfiguration$$EnhancerBySpringCGLIB$$b686b37a.step1(<generated>) ~[classes!/:na]

step1()方法调用.reader(importTasksReader(null))。这应该可以工作,因为参数是在运行时绑定的。但我注意到这不是正在发生的事情。在调用 step1() 方法的那一刻(因此在应用程序启动时)传递给我的 ItemReader 的 FileSystemResource 检查当时为 null 的参数,因此抛出异常。

我做错了什么?

这是我的代码:

import java.beans.PropertyEditor;
import java.beans.PropertyEditorSupport;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.support.PassThroughItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

@Configuration
public class TaskImportJobConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(TaskImportJobConfiguration.class);
    
    private static final String QUERY_INSERT_TASK = "INSERT INTO TASK (N_I_IDF, C_I_TASK_CODE, C_TASK_TYPE_CODE) "
            + "VALUES (NEXTVAL FOR TASK_SEQ, concat('BC',varchar_format(NEXTVAL FOR TASK_SEQ,'000000000')), :taskType)";
    
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job importTasksJob(Step step1) {
        return jobBuilderFactory.get("importTasksJob").incrementer(new RunIdIncrementer())
                .flow(step1).end().build();
    }

    private static final DateTimeFormatter DATE_FORMATTER=DateTimeFormatter.ofPattern("dd/MM/yyyy");
    
    @StepScope
    public FlatFileItemReader<CsvTask> importTasksReader(@Value("#{jobParameters['inputfile']}") String inputFile) {
        logger.info("IN-ImportTasksReader ("+inputFile+")");
        return new FlatFileItemReaderBuilder<CsvTask>()
                .name("taskItemReader")//
                .resource(new FileSystemResource(inputFile))//
                .linesToSkip(1)//
                .delimited()//
                .delimiter(";")
                .names(new String[] { "taskType"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<CsvTask>() {
                    {
                        setTargetType(CsvTask.class);
                        // 8< some custom editors >8
                    }
                }).build();
    }
    

    @Bean
    public JdbcBatchItemWriter<CsvTask> taskWriter(DataSource datasource, NamedParameterJdbcTemplate jdbcTemplate) {
        JdbcBatchItemWriter<CsvTask> itemwriter=new JdbcBatchItemWriter<>();
        itemwriter.setDataSource(datasource);
        itemwriter.setJdbcTemplate(jdbcTemplate);
        itemwriter.setSql(QUERY_INSERT_TASK);
        itemwriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return itemwriter;
    }
    
    @Bean
    public Step step1(JdbcBatchItemWriter<CsvTask> taskWriter) {
        return stepBuilderFactory //
                .get("importTasksJob.step1")//
                .<CsvTask, CsvTask>chunk(10) //
                .reader(importTasksReader(null)) //
                .processor(new PassThroughItemProcessor<CsvTask>())
                .writer(taskWriter) //
                .build();
    }       
}

更新:当我将@bean 注释添加到 importTasksReader 时,如:

    @Bean
    @StepScope
    public FlatFileItemReader<CsvTask> importTasksReader(@Value("#{jobParameters['inputfile']}") String inputFile) {

我得到以下异常:

2021-02-15 15:10:06.531  WARN 1 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'scopedTarget.importTasksReader' defined in BeanDefinition defined in class path resource [TaskImportJobConfiguration.class]: 
Cannot register bean definition [Root bean: class [org.springframework.aop.scope.ScopedProxyFactoryBean]; scope=; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in BeanDefinition defined in class path resource [TaskImportJobConfiguration.class]] for bean 'scopedTarget.importTasksReader': 
There is already [Root bean: class [null]; scope=step; abstract=false; lazyInit=null; autowireMode=3; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=taskImportJobConfiguration; factoryMethodName=importTasksReader; initMethodName=null; destroyMethodName=(inferred); defined in class path resource [TaskImportJobConfiguration.class]] bound.
2021-02-15 15:10:06.577 ERROR 1 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   :

***************************
 APPLICATION FAILED TO START
 ***************************

 Description:
 The bean 'scopedTarget.importTasksReader', defined in BeanDefinition defined in class path resource [TaskImportJobConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [TaskImportJobConfiguration.class] and overriding is disabled.
 Action:
 Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

为了完整起见:我的应用程序@Configuration class 还实例化了 StepScope:

@SpringBootApplication
@Configuration
@EnableBatchProcessing
public class Application {

    @Bean
    public static StepScope scope() {
        return new StepScope();
    }
   // ... other stuff hidden here
}  

您的项目 reader 未声明为 bean,因此当步骤请求它时,它不会被代理和延迟创建。您当前的配置会发生什么情况是该步骤调用带有 null 参数的方法。

除了 @StepScope 之外还需要添加 @Bean:

@Bean
@StepScope
public FlatFileItemReader<CsvTask> importTasksReader(@Value("#{jobParameters['inputfile']}") String inputFile) {
  // ...
}

编辑:根据问题更新

For completeness: my application @Configuration class also instantiates the StepScope

当你使用@EnableBatchProcessing时,你不需要手动注册step scope,注解会自动添加。这是其 javadoc:

的摘录
Once you have an @EnableBatchProcessing class in your configuration you will have an instance of StepScope and JobScope so your beans inside steps can have @Scope("step") and @Scope("job") respectively.

因此在您的情况下,您需要删除类型为 StepScopescope bean。