Spring 批处理、JpaRepository 和回滚

Spring Batch, JpaRepository and Rollback

我有一个 Spring 批处理应用程序(Spring Boot 2.3.5.RELEASE),它使用 JpaRepository 将一些自定义日志消息作为 Spring 批处理插入到数据库中正在处理。这与开箱即用的 Spring 批处理表是分开的。似乎当我从 ItemProcessorAdapter 抛出异常时,它被 ItemProcessListener onProcessError() 方法捕获。在这个方法中,我正在执行 JpaRepository save() 和 flush()。没有记录错误,但是一旦我离开这个方法,JpaRepository 就会回滚。

  1. 这是正常现象吗?我该如何解决?
  2. 使用JpaRepository时,有没有办法设置@Transactional(noRollbackFor = {xxxException.class})?这个我试过了,好像没有效果。

下面是示例代码片段。


@Configuration
public class BatchJobConfiguration {

   //Omitted for clarity....

    @Bean
    @StepScope
    public CompositeItemProcessor<Decision,Decision> itemProcessor() {
        CompositeItemProcessor<Decision,Decision> itemProcessor = new CompositeItemProcessor<>();
        itemProcessor.setDelegates(Arrays.asList(
                decisionValidatingItemProcessor(),
                myItemProcessor(null)
        ));
        return itemProcessor;
    } // end itemProcessor()
    
     @Bean
    public BeanValidatingItemProcessor<Decision> decisionValidatingItemProcessor() {
        BeanValidatingItemProcessor<Decision> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
        beanValidatingItemProcessor.setFilter(true);
        return beanValidatingItemProcessor;
    } // end decisionValidatingItemProcessor()

    @Bean
    public ItemProcessorAdapter<Decision,Decision> myItemProcessor(DecisionProcessingService service) {
        ItemProcessorAdapter<Decision,Decision> adapter = new ItemProcessorAdapter<>();
        adapter.setTargetObject(service);
        adapter.setTargetMethod("processDecision");
        return adapter;
    }
    
    @Bean
    @StepScope
    public DecisionItemProcessListener decisionItemProcessListener() {
        return new DecisionItemProcessListener(mpJpaRepository);
    }


}

@Service
public class DecisionProcessingService {

    public Decision processDecision(Decision decision) throws BatchException {
        ....
        throw new BatchException("An error occurred");
    
    }

}



public class DecisionItemProcessListener implements ItemProcessListener<Decision,Decision> {
    private MyJpaRepository mpJpaRepository;

    public DecisionItemProcessListener(MyJpaRepository mpJpaRepository) {
        this.mpJpaRepository = mpJpaRepository;
    }
    
    ....
    
    @Override
    public void onProcessError(Decision decision, Exception e) {
        MyEntityObject obj = MyEntityObject.builder()
                .msg(e.getMessage())
                .build();
        mpJpaRepository.save(obj);
        mpJpaRepository.flush();
        
        // after this, the insert above is rolled back.
    } // end onProcessError()
}

您在此处使用的回调 ItemProcessListener#onProcessError 是在事务中调用的(由 Spring 批处理驱动),由于项目处理器抛出的异常,该事务将被回滚.

如果你想用那个方法保存数据,你需要使用一个新的事务(使用REQUIRES_NEW传播)。

编辑: 我在这里分享了一个最小的完整示例:https://github.com/benas/spring-batch-lab/tree/master/issues/so64913980.