Spring 批处理、JpaRepository 和回滚
Spring Batch, JpaRepository and Rollback
我有一个 Spring 批处理应用程序(Spring Boot 2.3.5.RELEASE),它使用 JpaRepository 将一些自定义日志消息作为 Spring 批处理插入到数据库中正在处理。这与开箱即用的 Spring 批处理表是分开的。似乎当我从 ItemProcessorAdapter 抛出异常时,它被 ItemProcessListener onProcessError() 方法捕获。在这个方法中,我正在执行 JpaRepository save() 和 flush()。没有记录错误,但是一旦我离开这个方法,JpaRepository 就会回滚。
- 这是正常现象吗?我该如何解决?
- 使用JpaRepository时,有没有办法设置@Transactional(noRollbackFor = {xxxException.class})?这个我试过了,好像没有效果。
下面是示例代码片段。
@Configuration
public class BatchJobConfiguration {
//Omitted for clarity....
@Bean
@StepScope
public CompositeItemProcessor<Decision,Decision> itemProcessor() {
CompositeItemProcessor<Decision,Decision> itemProcessor = new CompositeItemProcessor<>();
itemProcessor.setDelegates(Arrays.asList(
decisionValidatingItemProcessor(),
myItemProcessor(null)
));
return itemProcessor;
} // end itemProcessor()
@Bean
public BeanValidatingItemProcessor<Decision> decisionValidatingItemProcessor() {
BeanValidatingItemProcessor<Decision> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);
return beanValidatingItemProcessor;
} // end decisionValidatingItemProcessor()
@Bean
public ItemProcessorAdapter<Decision,Decision> myItemProcessor(DecisionProcessingService service) {
ItemProcessorAdapter<Decision,Decision> adapter = new ItemProcessorAdapter<>();
adapter.setTargetObject(service);
adapter.setTargetMethod("processDecision");
return adapter;
}
@Bean
@StepScope
public DecisionItemProcessListener decisionItemProcessListener() {
return new DecisionItemProcessListener(mpJpaRepository);
}
}
@Service
public class DecisionProcessingService {
public Decision processDecision(Decision decision) throws BatchException {
....
throw new BatchException("An error occurred");
}
}
public class DecisionItemProcessListener implements ItemProcessListener<Decision,Decision> {
private MyJpaRepository mpJpaRepository;
public DecisionItemProcessListener(MyJpaRepository mpJpaRepository) {
this.mpJpaRepository = mpJpaRepository;
}
....
@Override
public void onProcessError(Decision decision, Exception e) {
MyEntityObject obj = MyEntityObject.builder()
.msg(e.getMessage())
.build();
mpJpaRepository.save(obj);
mpJpaRepository.flush();
// after this, the insert above is rolled back.
} // end onProcessError()
}
您在此处使用的回调 ItemProcessListener#onProcessError
是在事务中调用的(由 Spring 批处理驱动),由于项目处理器抛出的异常,该事务将被回滚.
如果你想用那个方法保存数据,你需要使用一个新的事务(使用REQUIRES_NEW
传播)。
编辑: 我在这里分享了一个最小的完整示例:https://github.com/benas/spring-batch-lab/tree/master/issues/so64913980.
我有一个 Spring 批处理应用程序(Spring Boot 2.3.5.RELEASE),它使用 JpaRepository 将一些自定义日志消息作为 Spring 批处理插入到数据库中正在处理。这与开箱即用的 Spring 批处理表是分开的。似乎当我从 ItemProcessorAdapter 抛出异常时,它被 ItemProcessListener onProcessError() 方法捕获。在这个方法中,我正在执行 JpaRepository save() 和 flush()。没有记录错误,但是一旦我离开这个方法,JpaRepository 就会回滚。
- 这是正常现象吗?我该如何解决?
- 使用JpaRepository时,有没有办法设置@Transactional(noRollbackFor = {xxxException.class})?这个我试过了,好像没有效果。
下面是示例代码片段。
@Configuration
public class BatchJobConfiguration {
//Omitted for clarity....
@Bean
@StepScope
public CompositeItemProcessor<Decision,Decision> itemProcessor() {
CompositeItemProcessor<Decision,Decision> itemProcessor = new CompositeItemProcessor<>();
itemProcessor.setDelegates(Arrays.asList(
decisionValidatingItemProcessor(),
myItemProcessor(null)
));
return itemProcessor;
} // end itemProcessor()
@Bean
public BeanValidatingItemProcessor<Decision> decisionValidatingItemProcessor() {
BeanValidatingItemProcessor<Decision> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);
return beanValidatingItemProcessor;
} // end decisionValidatingItemProcessor()
@Bean
public ItemProcessorAdapter<Decision,Decision> myItemProcessor(DecisionProcessingService service) {
ItemProcessorAdapter<Decision,Decision> adapter = new ItemProcessorAdapter<>();
adapter.setTargetObject(service);
adapter.setTargetMethod("processDecision");
return adapter;
}
@Bean
@StepScope
public DecisionItemProcessListener decisionItemProcessListener() {
return new DecisionItemProcessListener(mpJpaRepository);
}
}
@Service
public class DecisionProcessingService {
public Decision processDecision(Decision decision) throws BatchException {
....
throw new BatchException("An error occurred");
}
}
public class DecisionItemProcessListener implements ItemProcessListener<Decision,Decision> {
private MyJpaRepository mpJpaRepository;
public DecisionItemProcessListener(MyJpaRepository mpJpaRepository) {
this.mpJpaRepository = mpJpaRepository;
}
....
@Override
public void onProcessError(Decision decision, Exception e) {
MyEntityObject obj = MyEntityObject.builder()
.msg(e.getMessage())
.build();
mpJpaRepository.save(obj);
mpJpaRepository.flush();
// after this, the insert above is rolled back.
} // end onProcessError()
}
您在此处使用的回调 ItemProcessListener#onProcessError
是在事务中调用的(由 Spring 批处理驱动),由于项目处理器抛出的异常,该事务将被回滚.
如果你想用那个方法保存数据,你需要使用一个新的事务(使用REQUIRES_NEW
传播)。
编辑: 我在这里分享了一个最小的完整示例:https://github.com/benas/spring-batch-lab/tree/master/issues/so64913980.