Play Framework 2.5.4 异步休眠操作
Play Framework 2.5.4 Async Hibernate operations
如何使用 Akka 在使用 Hibernate 的数据库上执行操作而不阻止 Web 客户端?
更新:
原来错误是由 dao.get() 方法引起的。我更改了 start() 方法以获取实际对象而不是数据库中的 id,现在我没有收到任何错误,但什么也没有发生(如我之前所说,它卡在 em.merge() 上)。
public CompletionStage<Result> start(SomeObject object) {
ExecutionContext ec = Akka.system().dispatchers().lookup("akka.actor.db-context");
return CompletableFuture.supplyAsync(() -> doStuff(object), play.libs.concurrent.HttpExecution.fromThread(ec))
.thenApply(i -> ok("Got result: " + i));
}
已过时:
如果我尝试类似的操作:
@Transactional
public CompletionStage<Result> start(Long id) {
ExecutionContext ec = Akka.system().dispatchers().lookup("akka.actor.db-context");
return CompletableFuture.supplyAsync(() -> doStuff(dao.get(id)), play.libs.concurrent.HttpExecution.fromThread(ec))
.thenApply(i -> ok("Got result: " + i));
}
doStuff 仅执行 entityManager.merge(),我得到:
[CompletionException: org.hibernate.SessionException: Session is
closed!]
或
[CompletionException: java.lang.IllegalStateException: EntityManager
is closed]
当我使用下面的代码启动上面的过程时:
@Transactional
public Result mainMethod() {
List<SomeObject> allObjects= dao.getAll();
int size = allObjects.size();
for(int i = 0; i < size; i++) {
start(allObjects.get(i).getId());
}
return ok("Started");
}
然后新创建的线程(参与者)在尝试数据库操作时进入无限循环。
谢谢!
完整的堆栈跟踪:
play.api.http.HttpErrorHandlerExceptions$$anon: Execution
exception[[CompletionException: org.hibernate.SessionException:
Session is closed!]] at
play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:280)
at
play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:206)
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
at play.api.DefaultGlobal$.onError(GlobalSettings.scala:188) at
play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:98)
at
play.core.server.netty.PlayRequestHandler$$anonfun$$anonfun$apply.applyOrElse(PlayRequestHandler.scala:100)
at
play.core.server.netty.PlayRequestHandler$$anonfun$$anonfun$apply.applyOrElse(PlayRequestHandler.scala:99)
at
scala.concurrent.Future$$anonfun$recoverWith.apply(Future.scala:344)
at
scala.concurrent.Future$$anonfun$recoverWith.apply(Future.scala:343)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at
play.api.libs.iteratee.Execution$trampoline$.executeScheduled(Execution.scala:109)
at
play.api.libs.iteratee.Execution$trampoline$.execute(Execution.scala:71)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55) at
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at
scala.concurrent.java8.FuturesConvertersImpl$P.accept(FutureConvertersImpl.scala:94)
at
scala.concurrent.java8.FuturesConvertersImpl$P.accept(FutureConvertersImpl.scala:89)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at
play.core.j.HttpExecutionContext$$anon.run(HttpExecutionContext.scala:56)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:405)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException:
org.hibernate.SessionException: Session is closed! at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
... 7 common frames omitted Caused by:
org.hibernate.SessionException: Session is closed! at
org.hibernate.internal.AbstractSessionImpl.errorIfClosed(AbstractSessionImpl.java:133)
at
org.hibernate.internal.SessionImpl.setCacheMode(SessionImpl.java:1455)
at
org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1144)
at
org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1068)
at daos.dao.get(dao.java:45) at
controllers.DemoController.lambda$start[=16=](DemoController.java:195)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 7 common frames omitted
LE:
尝试过:
ExecutionContext ec = Akka.system().dispatchers().lookup("akka.actor.db-context");
return CompletableFuture.supplyAsync(() -> jpa.withTransaction("default", true, ()-> doStuff(dao.get(id))), play.libs.concurrent.HttpExecution.fromThread(ec))
.thenApply(i -> ok("Got result: " + i));
我得到了同样的错误。
@Transactional 注解在同步方法上。我想您需要在异步块中进行交易。您可以使用 for jpa 方法 withTransaction 而不是注释。
它的签名(来自Play java api)
/**
* Run a block of code in a JPA transaction.
*
* @param name The persistence unit name
* @param readOnly Is the transaction read-only?
* @param block Block of code to execute
* @param <T> type of result
* @return code execution result
*/
public <T> T withTransaction(String name, boolean readOnly, Supplier<T> block);
应该是这样的:
在控制器中注入JPAApi:
@Inject
JPAApi jpa;
通过调用 jpa 实现异步执行器:
jpa.withTransaction("default", true, ()->doStuff(dao.get(id)), play.libs.concurrent.HttpExecution.fromThread(ec))
.thenApply(i -> ok("Got result: " + i)));
请注意 withTransaction 调用中的第二个参数 true 用于只读操作(在您的情况下)。
- 更改 DAO class 以使用注入的 JPAAPi。
我按照@asch 的建议解决了这个问题:
控制器:
@Inject
private JPAApi jpa;
[...]
return CompletableFuture.supplyAsync(() -> jpa.withTransaction("default", false, ()-> doStuff(objectDAO.get(id))), play.libs.concurrent.HttpExecution.fromThread(ec))
.thenApply(i -> ok("Got result: " + i));
并且在 doStuff 方法中 class:
private static final JPAApi jpaApi = Play.current().injector().instanceOf(JPAApi.class);
[...]
jpaApi.withTransaction(() -> {
EntityManager em = jpaApi.em();
em.merge(object);
});
[...]
如何使用 Akka 在使用 Hibernate 的数据库上执行操作而不阻止 Web 客户端?
更新:
原来错误是由 dao.get() 方法引起的。我更改了 start() 方法以获取实际对象而不是数据库中的 id,现在我没有收到任何错误,但什么也没有发生(如我之前所说,它卡在 em.merge() 上)。
public CompletionStage<Result> start(SomeObject object) {
ExecutionContext ec = Akka.system().dispatchers().lookup("akka.actor.db-context");
return CompletableFuture.supplyAsync(() -> doStuff(object), play.libs.concurrent.HttpExecution.fromThread(ec))
.thenApply(i -> ok("Got result: " + i));
}
已过时:
如果我尝试类似的操作:
@Transactional
public CompletionStage<Result> start(Long id) {
ExecutionContext ec = Akka.system().dispatchers().lookup("akka.actor.db-context");
return CompletableFuture.supplyAsync(() -> doStuff(dao.get(id)), play.libs.concurrent.HttpExecution.fromThread(ec))
.thenApply(i -> ok("Got result: " + i));
}
doStuff 仅执行 entityManager.merge(),我得到:
[CompletionException: org.hibernate.SessionException: Session is closed!]
或
[CompletionException: java.lang.IllegalStateException: EntityManager is closed]
当我使用下面的代码启动上面的过程时:
@Transactional
public Result mainMethod() {
List<SomeObject> allObjects= dao.getAll();
int size = allObjects.size();
for(int i = 0; i < size; i++) {
start(allObjects.get(i).getId());
}
return ok("Started");
}
然后新创建的线程(参与者)在尝试数据库操作时进入无限循环。
谢谢!
完整的堆栈跟踪:
play.api.http.HttpErrorHandlerExceptions$$anon: Execution exception[[CompletionException: org.hibernate.SessionException: Session is closed!]] at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:280) at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:206) at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160) at play.api.DefaultGlobal$.onError(GlobalSettings.scala:188) at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:98) at play.core.server.netty.PlayRequestHandler$$anonfun$$anonfun$apply.applyOrElse(PlayRequestHandler.scala:100) at play.core.server.netty.PlayRequestHandler$$anonfun$$anonfun$apply.applyOrElse(PlayRequestHandler.scala:99) at scala.concurrent.Future$$anonfun$recoverWith.apply(Future.scala:344) at scala.concurrent.Future$$anonfun$recoverWith.apply(Future.scala:343) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at play.api.libs.iteratee.Execution$trampoline$.executeScheduled(Execution.scala:109) at play.api.libs.iteratee.Execution$trampoline$.execute(Execution.scala:71) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.complete(Promise.scala:55) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) at scala.concurrent.java8.FuturesConvertersImpl$P.accept(FutureConvertersImpl.scala:94) at scala.concurrent.java8.FuturesConvertersImpl$P.accept(FutureConvertersImpl.scala:89) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) at play.core.j.HttpExecutionContext$$anon.run(HttpExecutionContext.scala:56) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:405) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.CompletionException: org.hibernate.SessionException: Session is closed! at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ... 7 common frames omitted Caused by: org.hibernate.SessionException: Session is closed! at org.hibernate.internal.AbstractSessionImpl.errorIfClosed(AbstractSessionImpl.java:133) at org.hibernate.internal.SessionImpl.setCacheMode(SessionImpl.java:1455) at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1144) at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1068) at daos.dao.get(dao.java:45) at controllers.DemoController.lambda$start[=16=](DemoController.java:195) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 7 common frames omitted
LE:
尝试过:
ExecutionContext ec = Akka.system().dispatchers().lookup("akka.actor.db-context");
return CompletableFuture.supplyAsync(() -> jpa.withTransaction("default", true, ()-> doStuff(dao.get(id))), play.libs.concurrent.HttpExecution.fromThread(ec))
.thenApply(i -> ok("Got result: " + i));
我得到了同样的错误。
@Transactional 注解在同步方法上。我想您需要在异步块中进行交易。您可以使用 for jpa 方法 withTransaction 而不是注释。 它的签名(来自Play java api)
/**
* Run a block of code in a JPA transaction.
*
* @param name The persistence unit name
* @param readOnly Is the transaction read-only?
* @param block Block of code to execute
* @param <T> type of result
* @return code execution result
*/
public <T> T withTransaction(String name, boolean readOnly, Supplier<T> block);
应该是这样的:
在控制器中注入JPAApi:
@Inject JPAApi jpa;
通过调用 jpa 实现异步执行器:
jpa.withTransaction("default", true, ()->doStuff(dao.get(id)), play.libs.concurrent.HttpExecution.fromThread(ec)) .thenApply(i -> ok("Got result: " + i)));
请注意 withTransaction 调用中的第二个参数 true 用于只读操作(在您的情况下)。
- 更改 DAO class 以使用注入的 JPAAPi。
我按照@asch 的建议解决了这个问题:
控制器:
@Inject
private JPAApi jpa;
[...]
return CompletableFuture.supplyAsync(() -> jpa.withTransaction("default", false, ()-> doStuff(objectDAO.get(id))), play.libs.concurrent.HttpExecution.fromThread(ec))
.thenApply(i -> ok("Got result: " + i));
并且在 doStuff 方法中 class:
private static final JPAApi jpaApi = Play.current().injector().instanceOf(JPAApi.class);
[...]
jpaApi.withTransaction(() -> {
EntityManager em = jpaApi.em();
em.merge(object);
});
[...]