spring StepScope 中的批处理作业参数
spring batch job parameters in StepScope
我有一个 spring 批处理作业(v4.3.1,无 xml 配置)用于在数据库中加载 CSV 文件。
这些作业由休息控制器启动,因此不会在启动时自动启动。
当应用程序启动时知道文件名时,一切正常。
但是当我尝试将 jobParameter 传递给步骤(并使用 @StepScope 对其进行注释)时,我在应用程序启动时遇到异常(此时作业尚未启动)。
我得到的例外是
Caused by: java.lang.IllegalArgumentException: Path must not be null
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.2.jar!/:5.3.2]
at org.springframework.core.io.FileSystemResource.<init>(FileSystemResource.java:80) ~[spring-core-5.3.2.jar!/:5.3.2]
at TaskImportJobConfiguration.importTasksReader(TaskImportJobConfiguration.java:66) ~[classes!/:na]
at TaskImportJobConfiguration.step1(TaskImportJobConfiguration.java:109) ~[classes!/:na]
at TaskImportJobConfiguration$$EnhancerBySpringCGLIB$$b686b37a.CGLIB$step1(<generated>) ~[classes!/:na]
at TaskImportJobConfiguration$$EnhancerBySpringCGLIB$$b686b37a$$FastClassBySpringCGLIB$87bf2f.invoke(<generated>) ~[classes!/:na]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.3.2.jar!/:5.3.2]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-5.3.2.jar!/:5.3.2]
at TaskImportJobConfiguration$$EnhancerBySpringCGLIB$$b686b37a.step1(<generated>) ~[classes!/:na]
step1()方法调用.reader(importTasksReader(null))
。这应该可以工作,因为参数是在运行时绑定的。但我注意到这不是正在发生的事情。在调用 step1() 方法的那一刻(因此在应用程序启动时)传递给我的 ItemReader 的 FileSystemResource 检查当时为 null 的参数,因此抛出异常。
我做错了什么?
这是我的代码:
import java.beans.PropertyEditor;
import java.beans.PropertyEditorSupport;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.support.PassThroughItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
@Configuration
public class TaskImportJobConfiguration {
private static final Logger logger = LoggerFactory.getLogger(TaskImportJobConfiguration.class);
private static final String QUERY_INSERT_TASK = "INSERT INTO TASK (N_I_IDF, C_I_TASK_CODE, C_TASK_TYPE_CODE) "
+ "VALUES (NEXTVAL FOR TASK_SEQ, concat('BC',varchar_format(NEXTVAL FOR TASK_SEQ,'000000000')), :taskType)";
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job importTasksJob(Step step1) {
return jobBuilderFactory.get("importTasksJob").incrementer(new RunIdIncrementer())
.flow(step1).end().build();
}
private static final DateTimeFormatter DATE_FORMATTER=DateTimeFormatter.ofPattern("dd/MM/yyyy");
@StepScope
public FlatFileItemReader<CsvTask> importTasksReader(@Value("#{jobParameters['inputfile']}") String inputFile) {
logger.info("IN-ImportTasksReader ("+inputFile+")");
return new FlatFileItemReaderBuilder<CsvTask>()
.name("taskItemReader")//
.resource(new FileSystemResource(inputFile))//
.linesToSkip(1)//
.delimited()//
.delimiter(";")
.names(new String[] { "taskType"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CsvTask>() {
{
setTargetType(CsvTask.class);
// 8< some custom editors >8
}
}).build();
}
@Bean
public JdbcBatchItemWriter<CsvTask> taskWriter(DataSource datasource, NamedParameterJdbcTemplate jdbcTemplate) {
JdbcBatchItemWriter<CsvTask> itemwriter=new JdbcBatchItemWriter<>();
itemwriter.setDataSource(datasource);
itemwriter.setJdbcTemplate(jdbcTemplate);
itemwriter.setSql(QUERY_INSERT_TASK);
itemwriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return itemwriter;
}
@Bean
public Step step1(JdbcBatchItemWriter<CsvTask> taskWriter) {
return stepBuilderFactory //
.get("importTasksJob.step1")//
.<CsvTask, CsvTask>chunk(10) //
.reader(importTasksReader(null)) //
.processor(new PassThroughItemProcessor<CsvTask>())
.writer(taskWriter) //
.build();
}
}
更新:当我将@bean 注释添加到 importTasksReader 时,如:
@Bean
@StepScope
public FlatFileItemReader<CsvTask> importTasksReader(@Value("#{jobParameters['inputfile']}") String inputFile) {
我得到以下异常:
2021-02-15 15:10:06.531 WARN 1 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'scopedTarget.importTasksReader' defined in BeanDefinition defined in class path resource [TaskImportJobConfiguration.class]:
Cannot register bean definition [Root bean: class [org.springframework.aop.scope.ScopedProxyFactoryBean]; scope=; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in BeanDefinition defined in class path resource [TaskImportJobConfiguration.class]] for bean 'scopedTarget.importTasksReader':
There is already [Root bean: class [null]; scope=step; abstract=false; lazyInit=null; autowireMode=3; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=taskImportJobConfiguration; factoryMethodName=importTasksReader; initMethodName=null; destroyMethodName=(inferred); defined in class path resource [TaskImportJobConfiguration.class]] bound.
2021-02-15 15:10:06.577 ERROR 1 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter :
***************************
APPLICATION FAILED TO START
***************************
Description:
The bean 'scopedTarget.importTasksReader', defined in BeanDefinition defined in class path resource [TaskImportJobConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [TaskImportJobConfiguration.class] and overriding is disabled.
Action:
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
为了完整起见:我的应用程序@Configuration class 还实例化了 StepScope:
@SpringBootApplication
@Configuration
@EnableBatchProcessing
public class Application {
@Bean
public static StepScope scope() {
return new StepScope();
}
// ... other stuff hidden here
}
您的项目 reader 未声明为 bean,因此当步骤请求它时,它不会被代理和延迟创建。您当前的配置会发生什么情况是该步骤调用带有 null
参数的方法。
除了 @StepScope
之外还需要添加 @Bean
:
@Bean
@StepScope
public FlatFileItemReader<CsvTask> importTasksReader(@Value("#{jobParameters['inputfile']}") String inputFile) {
// ...
}
编辑:根据问题更新
For completeness: my application @Configuration class also instantiates the StepScope
当你使用@EnableBatchProcessing
时,你不需要手动注册step scope,注解会自动添加。这是其 javadoc:
的摘录
Once you have an @EnableBatchProcessing class in your configuration you will have an instance of StepScope and JobScope so your beans inside steps can have @Scope("step") and @Scope("job") respectively.
因此在您的情况下,您需要删除类型为 StepScope
的 scope
bean。
我有一个 spring 批处理作业(v4.3.1,无 xml 配置)用于在数据库中加载 CSV 文件。 这些作业由休息控制器启动,因此不会在启动时自动启动。 当应用程序启动时知道文件名时,一切正常。
但是当我尝试将 jobParameter 传递给步骤(并使用 @StepScope 对其进行注释)时,我在应用程序启动时遇到异常(此时作业尚未启动)。 我得到的例外是
Caused by: java.lang.IllegalArgumentException: Path must not be null
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.2.jar!/:5.3.2]
at org.springframework.core.io.FileSystemResource.<init>(FileSystemResource.java:80) ~[spring-core-5.3.2.jar!/:5.3.2]
at TaskImportJobConfiguration.importTasksReader(TaskImportJobConfiguration.java:66) ~[classes!/:na]
at TaskImportJobConfiguration.step1(TaskImportJobConfiguration.java:109) ~[classes!/:na]
at TaskImportJobConfiguration$$EnhancerBySpringCGLIB$$b686b37a.CGLIB$step1(<generated>) ~[classes!/:na]
at TaskImportJobConfiguration$$EnhancerBySpringCGLIB$$b686b37a$$FastClassBySpringCGLIB$87bf2f.invoke(<generated>) ~[classes!/:na]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.3.2.jar!/:5.3.2]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-5.3.2.jar!/:5.3.2]
at TaskImportJobConfiguration$$EnhancerBySpringCGLIB$$b686b37a.step1(<generated>) ~[classes!/:na]
step1()方法调用.reader(importTasksReader(null))
。这应该可以工作,因为参数是在运行时绑定的。但我注意到这不是正在发生的事情。在调用 step1() 方法的那一刻(因此在应用程序启动时)传递给我的 ItemReader 的 FileSystemResource 检查当时为 null 的参数,因此抛出异常。
我做错了什么?
这是我的代码:
import java.beans.PropertyEditor;
import java.beans.PropertyEditorSupport;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.support.PassThroughItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
@Configuration
public class TaskImportJobConfiguration {
private static final Logger logger = LoggerFactory.getLogger(TaskImportJobConfiguration.class);
private static final String QUERY_INSERT_TASK = "INSERT INTO TASK (N_I_IDF, C_I_TASK_CODE, C_TASK_TYPE_CODE) "
+ "VALUES (NEXTVAL FOR TASK_SEQ, concat('BC',varchar_format(NEXTVAL FOR TASK_SEQ,'000000000')), :taskType)";
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job importTasksJob(Step step1) {
return jobBuilderFactory.get("importTasksJob").incrementer(new RunIdIncrementer())
.flow(step1).end().build();
}
private static final DateTimeFormatter DATE_FORMATTER=DateTimeFormatter.ofPattern("dd/MM/yyyy");
@StepScope
public FlatFileItemReader<CsvTask> importTasksReader(@Value("#{jobParameters['inputfile']}") String inputFile) {
logger.info("IN-ImportTasksReader ("+inputFile+")");
return new FlatFileItemReaderBuilder<CsvTask>()
.name("taskItemReader")//
.resource(new FileSystemResource(inputFile))//
.linesToSkip(1)//
.delimited()//
.delimiter(";")
.names(new String[] { "taskType"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CsvTask>() {
{
setTargetType(CsvTask.class);
// 8< some custom editors >8
}
}).build();
}
@Bean
public JdbcBatchItemWriter<CsvTask> taskWriter(DataSource datasource, NamedParameterJdbcTemplate jdbcTemplate) {
JdbcBatchItemWriter<CsvTask> itemwriter=new JdbcBatchItemWriter<>();
itemwriter.setDataSource(datasource);
itemwriter.setJdbcTemplate(jdbcTemplate);
itemwriter.setSql(QUERY_INSERT_TASK);
itemwriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return itemwriter;
}
@Bean
public Step step1(JdbcBatchItemWriter<CsvTask> taskWriter) {
return stepBuilderFactory //
.get("importTasksJob.step1")//
.<CsvTask, CsvTask>chunk(10) //
.reader(importTasksReader(null)) //
.processor(new PassThroughItemProcessor<CsvTask>())
.writer(taskWriter) //
.build();
}
}
更新:当我将@bean 注释添加到 importTasksReader 时,如:
@Bean
@StepScope
public FlatFileItemReader<CsvTask> importTasksReader(@Value("#{jobParameters['inputfile']}") String inputFile) {
我得到以下异常:
2021-02-15 15:10:06.531 WARN 1 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'scopedTarget.importTasksReader' defined in BeanDefinition defined in class path resource [TaskImportJobConfiguration.class]:
Cannot register bean definition [Root bean: class [org.springframework.aop.scope.ScopedProxyFactoryBean]; scope=; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in BeanDefinition defined in class path resource [TaskImportJobConfiguration.class]] for bean 'scopedTarget.importTasksReader':
There is already [Root bean: class [null]; scope=step; abstract=false; lazyInit=null; autowireMode=3; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=taskImportJobConfiguration; factoryMethodName=importTasksReader; initMethodName=null; destroyMethodName=(inferred); defined in class path resource [TaskImportJobConfiguration.class]] bound.
2021-02-15 15:10:06.577 ERROR 1 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter :
***************************
APPLICATION FAILED TO START
***************************
Description:
The bean 'scopedTarget.importTasksReader', defined in BeanDefinition defined in class path resource [TaskImportJobConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [TaskImportJobConfiguration.class] and overriding is disabled.
Action:
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
为了完整起见:我的应用程序@Configuration class 还实例化了 StepScope:
@SpringBootApplication
@Configuration
@EnableBatchProcessing
public class Application {
@Bean
public static StepScope scope() {
return new StepScope();
}
// ... other stuff hidden here
}
您的项目 reader 未声明为 bean,因此当步骤请求它时,它不会被代理和延迟创建。您当前的配置会发生什么情况是该步骤调用带有 null
参数的方法。
除了 @StepScope
之外还需要添加 @Bean
:
@Bean
@StepScope
public FlatFileItemReader<CsvTask> importTasksReader(@Value("#{jobParameters['inputfile']}") String inputFile) {
// ...
}
编辑:根据问题更新
For completeness: my application @Configuration class also instantiates the StepScope
当你使用@EnableBatchProcessing
时,你不需要手动注册step scope,注解会自动添加。这是其 javadoc:
Once you have an @EnableBatchProcessing class in your configuration you will have an instance of StepScope and JobScope so your beans inside steps can have @Scope("step") and @Scope("job") respectively.
因此在您的情况下,您需要删除类型为 StepScope
的 scope
bean。