TransactionAttribute 不适用于简单步骤

TransactionAttribute not working for simple step

(请注意,此问题可能与 有关,但范围要小得多。)

我有这样定义的最简单的工作:


@Configuration
@EnableBatchProcessing
public class FileTransformerConfiguration {

  private JobBuilderFactory jobBuilderFactory;
  private StepBuilderFactory stepBuilderFactory;

  @Autowired
  public FileTransformerConfiguration(JobBuilderFactory jobBuilderFactory,
      StepBuilderFactory stepBuilderFactory) {
    this.jobBuilderFactory = jobBuilderFactory;
    this.stepBuilderFactory = stepBuilderFactory;
  }

  @Bean
  public Job transformJob() {
    return this.jobBuilderFactory.get("transformJob").incrementer(new RunIdIncrementer())
        .flow(transformStep()).end().build();
  }

  @Bean
  public Step transformStep() {
    return this.stepBuilderFactory.get("transformStep")
        .<String, String>chunk(1).reader(new ItemReader())
        .processor(processor())
        .writer(new ItemWriter()).build();
  }

  @Bean
  public ItemProcessor<String, String> processor() {
    return item -> {
      System.out.println("Converting item (" + item + ")...");
      return item;
    };
  }
}



public class ItemReader implements ItemStreamReader<String> {

  private Iterator<String> it;

  @Override
  public void open(ExecutionContext executionContext) throws ItemStreamException {
    this.it = Arrays.asList("A", "B", "C", "D", "E").iterator();
  }

  @Override
  public String read() throws Exception {
    return this.it.hasNext() ? this.it.next() : null;
  }

  @Override
  public void close() throws ItemStreamException  { }

  @Override
  public void update(ExecutionContext executionContext) throws ItemStreamException {}
}



@JobScope
public class ItemWriter implements ItemStreamWriter<String> {

  @Override
  public void open(ExecutionContext executionContext) throws ItemStreamException { }

  @Override
  public void write(List<? extends String> items) throws Exception {
    items.forEach(item -> System.out.println("Writing item: " + item));
  }

  @Override
  public void update(ExecutionContext executionContext) throws ItemStreamException { }

  @Override
  public void close() throws ItemStreamException { }

}

没有花哨的逻辑,只有字符串在管道中移动。

代码是这样调用的:

@SpringBootApplication
public class TestCmpsApplication {

}

@SpringBootTest(classes = {TestCmpsApplication.class})
public class FileTransformerImplIT {

  @Autowired
  private JobLauncher jobLauncher;
  @Autowired
  private Job transformJob;

  @Test
  void test1() throws Exception {
    String id = UUID.randomUUID().toString();
    JobParametersBuilder jobParameters = new JobParametersBuilder();
    jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
    jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
    this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
  }

  @Test
  void test2() throws Exception {
    String id = UUID.randomUUID().toString();
    JobParametersBuilder jobParameters = new JobParametersBuilder();
    jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
    jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
    this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
  }

}

(注意需要进行两次测试,即使它们是相同的。第一个总是有效。)

所以这很好用。但是,一旦我添加了这个:

  @Bean
  public Step transformStep() {
    return this.stepBuilderFactory.get("transformStep")
        .<String, String>chunk(1).reader(new ItemReader())
        .processor(processor())
        .writer(new ItemWriter())
        .transactionAttribute(transactionAttribute()).build();
  }

  private TransactionAttribute transactionAttribute() {
    DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
    attribute.setPropagationBehavior(Propagation.NEVER.value());
    return attribute;
  }

现在第二次测试失败了。测试本身说

TransactionSuspensionNotSupportedException: Transaction manager [org.springframework.batch.support.transaction.ResourcelessTransactionManager] does not support transaction suspension

虽然日志有助于提供此错误:

IllegalTransactionStateException: Existing transaction found for transaction marked with propagation 'never'

好的。我直接告诉工作永远不要使用事务,但不知何故,无论如何,有人创建了一个。那么让我们试试MANDATORY。现在测试出现与上面相同的错误,日志现在显示:

IllegalTransactionStateException: No existing transaction found for transaction marked with propagation 'mandatory'

不知何故,有人创建了一个交易,但不是为所有两个工作? SUPPORTS 肯定会起作用。不,那么测试将失败并出现相同的异常,并且日志将具有:

OptimisticLockingFailureException: Attempt to update step execution id=1 with wrong version (2), where current version is 3

我不知道发生了什么。显然有人在步骤之外创建了交易,但我不知道如何阻止它们。因为我宁愿没有交易。或者至少一个有效的事务管理是事务在连续调用两次时将工作相同。

我尝试了 Spring 批次 4.2、4.2.5、4.3 和 4.3.1。

我做错了什么?我怎样才能完成这项工作?

问题出在默认作业存储库上。似乎它的事务处理 is buggy。要解决此问题,请将其替换为具有内存数据库的 JDBC 作业存储库。只需将此 class 添加到 Spring 上下文:

@Configuration
@EnableBatchProcessing
public class InMemoryBatchContextConfigurer extends DefaultBatchConfigurer {

  @Override
  protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDatabaseType(DatabaseType.H2.getProductName());
    factory.setDataSource(dataSource());
    factory.setTransactionManager(getTransactionManager());
    return factory.getObject();
  }

  public DataSource dataSource() {
    EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
    return embeddedDatabaseBuilder
        .addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
        .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
        .setType(EmbeddedDatabaseType.H2).build();
  }
}