Spring 无法将事务传播到 ForkJoin 的 RecursiveAction
Spring cannot propagate transaction to ForkJoin's RecursiveAction
我正在尝试实施多线程解决方案,以便可以并行化包括读取和写入数据库的业务逻辑。
技术栈:Spring4.0.2,Hibernate 4.3.8
这里有一些代码可以讨论:
配置
@Configuration
public class PartitionersConfig {
@Bean
public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
final ForkJoinPoolFactoryBean poolFactory = new ForkJoinPoolFactoryBean();
return poolFactory;
}
}
服务
@Service
@Transactional
public class MyService {
@Autowired
private OtherService otherService;
@Autowired
private ForkJoinPool forkJoinPool;
@Autowired
private MyDao myDao;
public void performPartitionedActionOnIds() {
final ArrayList<UUID> ids = otherService.getIds();
MyIdPartitioner task = new MyIdsPartitioner(ids, myDao, 0, ids.size() - 1);
forkJoinPool.invoke(task);
}
}
存储库/DAO
@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IdsDao {
public MyData getData(List<UUID> list) {
// ...
}
}
递归动作
public class MyIdsPartitioner extends RecursiveAction {
private static final long serialVersionUID = 1L;
private static final int THRESHOLD = 100;
private ArrayList<UUID> ids;
private int fromIndex;
private int toIndex;
private MyDao myDao;
public MyIdsPartitioner(ArrayList<UUID> ids, MyDao myDao, int fromIndex, int toIndex) {
this.ids = ids;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
this.myDao = myDao;
}
@Override
protected void compute() {
if (computationSetIsSamllEnough()) {
computeDirectly();
} else {
int leftToIndex = fromIndex + (toIndex - fromIndex) / 2;
MyIdsPartitioner leftPartitioner = new MyIdsPartitioner(ids, myDao, fromIndex, leftToIndex);
MyIdsPartitioner rightPartitioner = new MyIdsPartitioner(ids, myDao, leftToIndex + 1, toIndex);
invokeAll(leftPartitioner, rightPartitioner);
}
}
private boolean computationSetIsSamllEnough() {
return (toIndex - fromIndex) < THRESHOLD;
}
private void computeDirectly() {
final List<UUID> subList = ids.subList(fromIndex, toIndex);
final MyData myData = myDao.getData(sublist);
modifyTheData(myData);
}
private void modifyTheData(MyData myData) {
// ...
// write to DB
}
}
执行此操作后我得到:
No existing transaction found for transaction marked with propagation 'mandatory'
我知道这是完全正常的,因为交易不会通过不同的线程传播。所以一种解决方案是在每个线程中手动创建一个事务 as proposed in another similar question。但这对我来说还不够满意,所以我一直在寻找。
在 Spring 的论坛中我找到了 a discussion on the topic。有一段我觉得很有意思:
"I can imagine one could manually propagate the transaction context to another thread, but I don't think you should really try it. Transactions are bound to single threads with a reason - the basic underlying resource - jdbc connection - is not threadsafe. Using one single connection in multiple threads would break fundamental jdbc request/response contracts and it would be a small wonder if it would work in more then trivial examples."
所以第一个问题出现了:将 reading/writing 并行化到数据库是否值得,这真的会损害数据库的一致性吗?
如果上面的引用不正确,我对此表示怀疑,有没有办法实现以下目标:
MyIdPartitioner 将被 Spring 管理 - 使用 @Scope("prototype") - 并将递归调用所需的参数传递给它,这样就将事务管理留给 Spring?
这应该可以通过 atomikos (http://www.atomikos.com) 和可选的嵌套事务实现。
如果执行此操作,则在同一根事务的多个线程写入数据库中的相同表时注意避免死锁。
经过进一步阅读,我设法解决了我的问题。有点(正如我现在所见,一开始就没有问题)。
由于我从数据库中读取的数据是块状的,而且我确信在这段时间内不会编辑结果,所以我可以在事务之外进行。
在我的情况下,写入也是安全的,因为我写入的所有值都是唯一的,并且不会发生违反约束的情况。所以我也从那里删除了交易。
我说 "I removed the transaction" 的意思只是在我的 DAO 中覆盖方法的传播模式,例如:
@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IdsDao {
@Transactional(propagation = Propagation.SUPPORTS)
public MyData getData(List<UUID> list) {
// ...
}
}
或者如果您出于某种原因决定需要交易,那么您仍然可以通过将传播设置为 REQUIRED
.
将交易管理留给 Spring
所以解决方案比我想象的要简单得多。
并回答我的其他问题:
Is it worth it to pararellize the reading/writing to the database and can this really hurt the DB consistency?
是的,这是值得的。只要每个线程都有事务,你就很酷。
Is there a way to achieve the following: MyIdPartitioner to be Spring managed - with @Scope("prototype") - and pass the needed arguments for the recursive calls to it and that way leave the transaction management to Spring?
是的,using pool 有一个方法(另一个 Whosebug 问题)。或者您可以将 bean 定义为 @Scope(value = "prototype", proxyMode = ScopedProxyMode.TARGET_CLASS)
但是如果您需要为其设置参数,它将无法工作,因为实例的每次使用都会为您提供一个新实例。前任。
@Autowire
MyIdsPartitioner partitioner;
public void someMethod() {
...
partitioner.setIds(someIds);
partitioner.setFromIndex(fromIndex);
partitioner.setToIndex(toIndex);
...
}
这将创建 3 个实例,您将无法使用该对象,因为不会设置字段。
简而言之 - 有一种方法,但我一开始并不需要去做。
我正在尝试实施多线程解决方案,以便可以并行化包括读取和写入数据库的业务逻辑。
技术栈:Spring4.0.2,Hibernate 4.3.8
这里有一些代码可以讨论:
配置
@Configuration
public class PartitionersConfig {
@Bean
public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
final ForkJoinPoolFactoryBean poolFactory = new ForkJoinPoolFactoryBean();
return poolFactory;
}
}
服务
@Service
@Transactional
public class MyService {
@Autowired
private OtherService otherService;
@Autowired
private ForkJoinPool forkJoinPool;
@Autowired
private MyDao myDao;
public void performPartitionedActionOnIds() {
final ArrayList<UUID> ids = otherService.getIds();
MyIdPartitioner task = new MyIdsPartitioner(ids, myDao, 0, ids.size() - 1);
forkJoinPool.invoke(task);
}
}
存储库/DAO
@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IdsDao {
public MyData getData(List<UUID> list) {
// ...
}
}
递归动作
public class MyIdsPartitioner extends RecursiveAction {
private static final long serialVersionUID = 1L;
private static final int THRESHOLD = 100;
private ArrayList<UUID> ids;
private int fromIndex;
private int toIndex;
private MyDao myDao;
public MyIdsPartitioner(ArrayList<UUID> ids, MyDao myDao, int fromIndex, int toIndex) {
this.ids = ids;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
this.myDao = myDao;
}
@Override
protected void compute() {
if (computationSetIsSamllEnough()) {
computeDirectly();
} else {
int leftToIndex = fromIndex + (toIndex - fromIndex) / 2;
MyIdsPartitioner leftPartitioner = new MyIdsPartitioner(ids, myDao, fromIndex, leftToIndex);
MyIdsPartitioner rightPartitioner = new MyIdsPartitioner(ids, myDao, leftToIndex + 1, toIndex);
invokeAll(leftPartitioner, rightPartitioner);
}
}
private boolean computationSetIsSamllEnough() {
return (toIndex - fromIndex) < THRESHOLD;
}
private void computeDirectly() {
final List<UUID> subList = ids.subList(fromIndex, toIndex);
final MyData myData = myDao.getData(sublist);
modifyTheData(myData);
}
private void modifyTheData(MyData myData) {
// ...
// write to DB
}
}
执行此操作后我得到:
No existing transaction found for transaction marked with propagation 'mandatory'
我知道这是完全正常的,因为交易不会通过不同的线程传播。所以一种解决方案是在每个线程中手动创建一个事务 as proposed in another similar question。但这对我来说还不够满意,所以我一直在寻找。
在 Spring 的论坛中我找到了 a discussion on the topic。有一段我觉得很有意思:
"I can imagine one could manually propagate the transaction context to another thread, but I don't think you should really try it. Transactions are bound to single threads with a reason - the basic underlying resource - jdbc connection - is not threadsafe. Using one single connection in multiple threads would break fundamental jdbc request/response contracts and it would be a small wonder if it would work in more then trivial examples."
所以第一个问题出现了:将 reading/writing 并行化到数据库是否值得,这真的会损害数据库的一致性吗?
如果上面的引用不正确,我对此表示怀疑,有没有办法实现以下目标:
MyIdPartitioner 将被 Spring 管理 - 使用 @Scope("prototype") - 并将递归调用所需的参数传递给它,这样就将事务管理留给 Spring?
这应该可以通过 atomikos (http://www.atomikos.com) 和可选的嵌套事务实现。
如果执行此操作,则在同一根事务的多个线程写入数据库中的相同表时注意避免死锁。
经过进一步阅读,我设法解决了我的问题。有点(正如我现在所见,一开始就没有问题)。
由于我从数据库中读取的数据是块状的,而且我确信在这段时间内不会编辑结果,所以我可以在事务之外进行。
在我的情况下,写入也是安全的,因为我写入的所有值都是唯一的,并且不会发生违反约束的情况。所以我也从那里删除了交易。
我说 "I removed the transaction" 的意思只是在我的 DAO 中覆盖方法的传播模式,例如:
@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IdsDao {
@Transactional(propagation = Propagation.SUPPORTS)
public MyData getData(List<UUID> list) {
// ...
}
}
或者如果您出于某种原因决定需要交易,那么您仍然可以通过将传播设置为 REQUIRED
.
所以解决方案比我想象的要简单得多。
并回答我的其他问题:
Is it worth it to pararellize the reading/writing to the database and can this really hurt the DB consistency?
是的,这是值得的。只要每个线程都有事务,你就很酷。
Is there a way to achieve the following: MyIdPartitioner to be Spring managed - with @Scope("prototype") - and pass the needed arguments for the recursive calls to it and that way leave the transaction management to Spring?
是的,using pool 有一个方法(另一个 Whosebug 问题)。或者您可以将 bean 定义为 @Scope(value = "prototype", proxyMode = ScopedProxyMode.TARGET_CLASS)
但是如果您需要为其设置参数,它将无法工作,因为实例的每次使用都会为您提供一个新实例。前任。
@Autowire
MyIdsPartitioner partitioner;
public void someMethod() {
...
partitioner.setIds(someIds);
partitioner.setFromIndex(fromIndex);
partitioner.setToIndex(toIndex);
...
}
这将创建 3 个实例,您将无法使用该对象,因为不会设置字段。
简而言之 - 有一种方法,但我一开始并不需要去做。