TransactionAttribute 不适用于简单步骤
TransactionAttribute not working for simple step
(请注意,此问题可能与 有关,但范围要小得多。)
我有这样定义的最简单的工作:
@Configuration
@EnableBatchProcessing
public class FileTransformerConfiguration {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
@Autowired
public FileTransformerConfiguration(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public Job transformJob() {
return this.jobBuilderFactory.get("transformJob").incrementer(new RunIdIncrementer())
.flow(transformStep()).end().build();
}
@Bean
public Step transformStep() {
return this.stepBuilderFactory.get("transformStep")
.<String, String>chunk(1).reader(new ItemReader())
.processor(processor())
.writer(new ItemWriter()).build();
}
@Bean
public ItemProcessor<String, String> processor() {
return item -> {
System.out.println("Converting item (" + item + ")...");
return item;
};
}
}
public class ItemReader implements ItemStreamReader<String> {
private Iterator<String> it;
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.it = Arrays.asList("A", "B", "C", "D", "E").iterator();
}
@Override
public String read() throws Exception {
return this.it.hasNext() ? this.it.next() : null;
}
@Override
public void close() throws ItemStreamException { }
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {}
}
@JobScope
public class ItemWriter implements ItemStreamWriter<String> {
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException { }
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(item -> System.out.println("Writing item: " + item));
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException { }
@Override
public void close() throws ItemStreamException { }
}
没有花哨的逻辑,只有字符串在管道中移动。
代码是这样调用的:
@SpringBootApplication
public class TestCmpsApplication {
}
@SpringBootTest(classes = {TestCmpsApplication.class})
public class FileTransformerImplIT {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job transformJob;
@Test
void test1() throws Exception {
String id = UUID.randomUUID().toString();
JobParametersBuilder jobParameters = new JobParametersBuilder();
jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
}
@Test
void test2() throws Exception {
String id = UUID.randomUUID().toString();
JobParametersBuilder jobParameters = new JobParametersBuilder();
jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
}
}
(注意需要进行两次测试,即使它们是相同的。第一个总是有效。)
所以这很好用。但是,一旦我添加了这个:
@Bean
public Step transformStep() {
return this.stepBuilderFactory.get("transformStep")
.<String, String>chunk(1).reader(new ItemReader())
.processor(processor())
.writer(new ItemWriter())
.transactionAttribute(transactionAttribute()).build();
}
private TransactionAttribute transactionAttribute() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.NEVER.value());
return attribute;
}
现在第二次测试失败了。测试本身说
TransactionSuspensionNotSupportedException: Transaction manager [org.springframework.batch.support.transaction.ResourcelessTransactionManager] does not support transaction suspension
虽然日志有助于提供此错误:
IllegalTransactionStateException: Existing transaction found for transaction marked with propagation 'never'
好的。我直接告诉工作永远不要使用事务,但不知何故,无论如何,有人创建了一个。那么让我们试试MANDATORY
。现在测试出现与上面相同的错误,日志现在显示:
IllegalTransactionStateException: No existing transaction found for transaction marked with propagation 'mandatory'
不知何故,有人创建了一个交易,但不是为所有两个工作? SUPPORTS
肯定会起作用。不,那么测试将失败并出现相同的异常,并且日志将具有:
OptimisticLockingFailureException: Attempt to update step execution id=1 with wrong version (2), where current version is 3
我不知道发生了什么。显然有人在步骤之外创建了交易,但我不知道如何阻止它们。因为我宁愿没有交易。或者至少一个有效的事务管理是事务在连续调用两次时将工作相同。
我尝试了 Spring 批次 4.2、4.2.5、4.3 和 4.3.1。
我做错了什么?我怎样才能完成这项工作?
问题出在默认作业存储库上。似乎它的事务处理 is buggy。要解决此问题,请将其替换为具有内存数据库的 JDBC 作业存储库。只需将此 class 添加到 Spring 上下文:
@Configuration
@EnableBatchProcessing
public class InMemoryBatchContextConfigurer extends DefaultBatchConfigurer {
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDatabaseType(DatabaseType.H2.getProductName());
factory.setDataSource(dataSource());
factory.setTransactionManager(getTransactionManager());
return factory.getObject();
}
public DataSource dataSource() {
EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
return embeddedDatabaseBuilder
.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
.addScript("classpath:org/springframework/batch/core/schema-h2.sql")
.setType(EmbeddedDatabaseType.H2).build();
}
}
(请注意,此问题可能与
我有这样定义的最简单的工作:
@Configuration
@EnableBatchProcessing
public class FileTransformerConfiguration {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
@Autowired
public FileTransformerConfiguration(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public Job transformJob() {
return this.jobBuilderFactory.get("transformJob").incrementer(new RunIdIncrementer())
.flow(transformStep()).end().build();
}
@Bean
public Step transformStep() {
return this.stepBuilderFactory.get("transformStep")
.<String, String>chunk(1).reader(new ItemReader())
.processor(processor())
.writer(new ItemWriter()).build();
}
@Bean
public ItemProcessor<String, String> processor() {
return item -> {
System.out.println("Converting item (" + item + ")...");
return item;
};
}
}
public class ItemReader implements ItemStreamReader<String> {
private Iterator<String> it;
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.it = Arrays.asList("A", "B", "C", "D", "E").iterator();
}
@Override
public String read() throws Exception {
return this.it.hasNext() ? this.it.next() : null;
}
@Override
public void close() throws ItemStreamException { }
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {}
}
@JobScope
public class ItemWriter implements ItemStreamWriter<String> {
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException { }
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(item -> System.out.println("Writing item: " + item));
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException { }
@Override
public void close() throws ItemStreamException { }
}
没有花哨的逻辑,只有字符串在管道中移动。
代码是这样调用的:
@SpringBootApplication
public class TestCmpsApplication {
}
@SpringBootTest(classes = {TestCmpsApplication.class})
public class FileTransformerImplIT {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job transformJob;
@Test
void test1() throws Exception {
String id = UUID.randomUUID().toString();
JobParametersBuilder jobParameters = new JobParametersBuilder();
jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
}
@Test
void test2() throws Exception {
String id = UUID.randomUUID().toString();
JobParametersBuilder jobParameters = new JobParametersBuilder();
jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
}
}
(注意需要进行两次测试,即使它们是相同的。第一个总是有效。)
所以这很好用。但是,一旦我添加了这个:
@Bean
public Step transformStep() {
return this.stepBuilderFactory.get("transformStep")
.<String, String>chunk(1).reader(new ItemReader())
.processor(processor())
.writer(new ItemWriter())
.transactionAttribute(transactionAttribute()).build();
}
private TransactionAttribute transactionAttribute() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.NEVER.value());
return attribute;
}
现在第二次测试失败了。测试本身说
TransactionSuspensionNotSupportedException: Transaction manager [org.springframework.batch.support.transaction.ResourcelessTransactionManager] does not support transaction suspension
虽然日志有助于提供此错误:
IllegalTransactionStateException: Existing transaction found for transaction marked with propagation 'never'
好的。我直接告诉工作永远不要使用事务,但不知何故,无论如何,有人创建了一个。那么让我们试试MANDATORY
。现在测试出现与上面相同的错误,日志现在显示:
IllegalTransactionStateException: No existing transaction found for transaction marked with propagation 'mandatory'
不知何故,有人创建了一个交易,但不是为所有两个工作? SUPPORTS
肯定会起作用。不,那么测试将失败并出现相同的异常,并且日志将具有:
OptimisticLockingFailureException: Attempt to update step execution id=1 with wrong version (2), where current version is 3
我不知道发生了什么。显然有人在步骤之外创建了交易,但我不知道如何阻止它们。因为我宁愿没有交易。或者至少一个有效的事务管理是事务在连续调用两次时将工作相同。
我尝试了 Spring 批次 4.2、4.2.5、4.3 和 4.3.1。
我做错了什么?我怎样才能完成这项工作?
问题出在默认作业存储库上。似乎它的事务处理 is buggy。要解决此问题,请将其替换为具有内存数据库的 JDBC 作业存储库。只需将此 class 添加到 Spring 上下文:
@Configuration
@EnableBatchProcessing
public class InMemoryBatchContextConfigurer extends DefaultBatchConfigurer {
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDatabaseType(DatabaseType.H2.getProductName());
factory.setDataSource(dataSource());
factory.setTransactionManager(getTransactionManager());
return factory.getObject();
}
public DataSource dataSource() {
EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
return embeddedDatabaseBuilder
.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
.addScript("classpath:org/springframework/batch/core/schema-h2.sql")
.setType(EmbeddedDatabaseType.H2).build();
}
}