Spring 项目处理器中的批处理运行时异常
Spring Batch Runtime Exception in Item processor
我正在学习 spring 批处理并试图了解异常期间项目处理器的工作原理。
我正在从 csv 文件中以 3 条记录的形式读取数据并对其进行处理并将其写入数据库。
我的 csv 文件
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doem
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
批量配置,读取 3 块中的项目,并跳过限制 2
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>().name("personItemReader").resource(new ClassPathResource("sample-data.csv")).delimited()
.names(new String[] { "firstName", "lastName" }).fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
{
setTargetType(Person.class);
}
}).build();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(dataSource).build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1).end().build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1").<Person, Person> chunk(3).reader(reader()).processor(processor()).writer(writer).faultTolerant().skipLimit(2)
.skip(Exception.class).build();
}
}
我正在尝试通过为我的项目处理器中的一条记录手动抛出异常来模拟异常
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting (" + person + ") into (" + transformedPerson + ")");
if (person.getLastName().equals("Doem"))
throw new Exception("DOOM");
return transformedPerson;
}
}
现在根据跳过限制,当抛出异常时,项目处理器正在重新处理块并跳过抛出错误的项目,并且项目写入也将所有记录插入 DB 中,除了有异常的记录。
这一切都很好,因为我的处理器,它只是将小写名称转换为大写名称,它可以 运行 多次而不会产生影响。
但假设我的项目处理器正在调用 Web 服务并发送数据。
如果在成功调用 Web 服务后抛出一些异常。然后将再次处理块中的剩余数据(并再次调用 webservice)。
不想再调用web service,因为这就像向web service发送重复数据,而webservice系统无法识别重复数据。
如何处理这种情况。一种选择是不要跳过异常,这意味着即使处理器调用了 Web 服务,我在块中的一条记录也不会进入项目编写器。所以这是不正确的。
其他选项块的大小应为 1 ,那么这在处理数千条记录时可能效率不高。
还有哪些选择?
根据您的描述,您的项目处理器不是幂等的。但是,文档的 Fault tolerance 部分指出,在使用容错步骤时,项目处理器应该是幂等的。以下是摘录:
If a step is configured to be fault tolerant (typically by using skip or retry processing), any ItemProcessor used should be implemented in a way that is idempotent.
我正在学习 spring 批处理并试图了解异常期间项目处理器的工作原理。
我正在从 csv 文件中以 3 条记录的形式读取数据并对其进行处理并将其写入数据库。
我的 csv 文件
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doem
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
批量配置,读取 3 块中的项目,并跳过限制 2
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>().name("personItemReader").resource(new ClassPathResource("sample-data.csv")).delimited()
.names(new String[] { "firstName", "lastName" }).fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
{
setTargetType(Person.class);
}
}).build();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(dataSource).build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1).end().build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1").<Person, Person> chunk(3).reader(reader()).processor(processor()).writer(writer).faultTolerant().skipLimit(2)
.skip(Exception.class).build();
}
}
我正在尝试通过为我的项目处理器中的一条记录手动抛出异常来模拟异常
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting (" + person + ") into (" + transformedPerson + ")");
if (person.getLastName().equals("Doem"))
throw new Exception("DOOM");
return transformedPerson;
}
}
现在根据跳过限制,当抛出异常时,项目处理器正在重新处理块并跳过抛出错误的项目,并且项目写入也将所有记录插入 DB 中,除了有异常的记录。
这一切都很好,因为我的处理器,它只是将小写名称转换为大写名称,它可以 运行 多次而不会产生影响。
但假设我的项目处理器正在调用 Web 服务并发送数据。 如果在成功调用 Web 服务后抛出一些异常。然后将再次处理块中的剩余数据(并再次调用 webservice)。 不想再调用web service,因为这就像向web service发送重复数据,而webservice系统无法识别重复数据。
如何处理这种情况。一种选择是不要跳过异常,这意味着即使处理器调用了 Web 服务,我在块中的一条记录也不会进入项目编写器。所以这是不正确的。
其他选项块的大小应为 1 ,那么这在处理数千条记录时可能效率不高。
还有哪些选择?
根据您的描述,您的项目处理器不是幂等的。但是,文档的 Fault tolerance 部分指出,在使用容错步骤时,项目处理器应该是幂等的。以下是摘录:
If a step is configured to be fault tolerant (typically by using skip or retry processing), any ItemProcessor used should be implemented in a way that is idempotent.