Vert.x: 如何等待未来完成
Vert.x: How to wait for a future to complete
有没有办法在不阻塞事件循环的情况下等待未来完成?
查询用例示例 Mongo:
Future<Result> dbFut = Future.future();
mongo.findOne("myusers", myQuery, new JsonObject(), res -> {
if(res.succeeded()) {
...
dbFut.complete(res.result());
}
else {
...
dbFut.fail(res.cause());
}
}
});
// Here I need the result of the DB query
if(dbFut.succeeded()) {
doSomethingWith(dbFut.result());
}
else {
error();
}
我知道 doSomethingWith(dbFut.result());
可以移动到处理程序,但如果它很长,代码将变得不可读(回调地狱?)这是正确的解决方案吗?这是没有额外库的万能解决方案吗?
我知道 rxJava 简化了代码,但我不知道,学习 Vert.x 和 rxJava 实在是太多了。
我也想试试vertx-sync
。我把依赖放在 pom.xml
;一切都下载好了,但是当我启动我的应用程序时,出现以下错误
maurice@mickey> java \
-javaagent:~/.m2/repository/co/paralleluniverse/quasar-core/0.7.5/quasar-core-0.7.5-jdk8.jar \
-jar target/app-dev-0.1-fat.jar \
-conf conf/config.json
Error opening zip file or JAR manifest missing : ~/.m2/repository/co/paralleluniverse/quasar-core/0.7.5/quasar-core-0.7.5-jdk8.jar
Error occurred during initialization of VM
agent library failed to init: instrument
我知道错误的一般含义,但我不知道在那种情况下...我尝试 google 解决它,但没有找到关于将哪个清单放在哪里的任何明确解释.和以前一样,除非是强制性的,否则我更喜欢一次学一件事。
所以,回到问题:有没有办法让 "basic" Vert.x 等待未来而不扰动事件循环?
您可以为未来设置一个处理程序,以便在完成或失败时执行:
Future<Result> dbFut = Future.future();
mongo.findOne("myusers", myQuery, new JsonObject(), res -> {
if(res.succeeded()) {
...
dbFut.complete(res.result());
}
else {
...
dbFut.fail(res.cause());
}
}
});
dbFut.setHandler(asyncResult -> {
if(asyncResult.succeeded()) {
// your logic here
}
});
这是一种纯粹的Vert.x方式,不会阻塞事件循环
我同意您不应阻塞 Vertx 处理管道,但我对该规则有一个例外:启动。按照设计,我想在我的 HTTP 服务器初始化时阻止。
此代码可能对您有所帮助:
/**
* @return null when waiting on {@code Future<Void>}
*/
@Nullable
public static <T>
T awaitComplete(Future<T> f)
throws Throwable
{
final Object lock = new Object();
final AtomicReference<AsyncResult<T>> resultRef = new AtomicReference<>(null);
synchronized (lock)
{
// We *must* be locked before registering a callback.
// If result is ready, the callback is called immediately!
f.onComplete(
(AsyncResult<T> result) ->
{
resultRef.set(result);
synchronized (lock) {
lock.notify();
}
});
do {
// Nested sync on lock is fine. If we get a spurious wake-up before resultRef is set, we need to
// reacquire the lock, then wait again.
// Ref:
synchronized (lock)
{
// @Blocking
lock.wait();
}
}
while (null == resultRef.get());
}
final AsyncResult<T> result = resultRef.get();
@Nullable
final Throwable t = result.cause();
if (null != t) {
throw t;
}
@Nullable
final T x = result.result();
return x;
}
有没有办法在不阻塞事件循环的情况下等待未来完成?
查询用例示例 Mongo:
Future<Result> dbFut = Future.future();
mongo.findOne("myusers", myQuery, new JsonObject(), res -> {
if(res.succeeded()) {
...
dbFut.complete(res.result());
}
else {
...
dbFut.fail(res.cause());
}
}
});
// Here I need the result of the DB query
if(dbFut.succeeded()) {
doSomethingWith(dbFut.result());
}
else {
error();
}
我知道 doSomethingWith(dbFut.result());
可以移动到处理程序,但如果它很长,代码将变得不可读(回调地狱?)这是正确的解决方案吗?这是没有额外库的万能解决方案吗?
我知道 rxJava 简化了代码,但我不知道,学习 Vert.x 和 rxJava 实在是太多了。
我也想试试vertx-sync
。我把依赖放在 pom.xml
;一切都下载好了,但是当我启动我的应用程序时,出现以下错误
maurice@mickey> java \
-javaagent:~/.m2/repository/co/paralleluniverse/quasar-core/0.7.5/quasar-core-0.7.5-jdk8.jar \
-jar target/app-dev-0.1-fat.jar \
-conf conf/config.json
Error opening zip file or JAR manifest missing : ~/.m2/repository/co/paralleluniverse/quasar-core/0.7.5/quasar-core-0.7.5-jdk8.jar
Error occurred during initialization of VM
agent library failed to init: instrument
我知道错误的一般含义,但我不知道在那种情况下...我尝试 google 解决它,但没有找到关于将哪个清单放在哪里的任何明确解释.和以前一样,除非是强制性的,否则我更喜欢一次学一件事。
所以,回到问题:有没有办法让 "basic" Vert.x 等待未来而不扰动事件循环?
您可以为未来设置一个处理程序,以便在完成或失败时执行:
Future<Result> dbFut = Future.future();
mongo.findOne("myusers", myQuery, new JsonObject(), res -> {
if(res.succeeded()) {
...
dbFut.complete(res.result());
}
else {
...
dbFut.fail(res.cause());
}
}
});
dbFut.setHandler(asyncResult -> {
if(asyncResult.succeeded()) {
// your logic here
}
});
这是一种纯粹的Vert.x方式,不会阻塞事件循环
我同意您不应阻塞 Vertx 处理管道,但我对该规则有一个例外:启动。按照设计,我想在我的 HTTP 服务器初始化时阻止。
此代码可能对您有所帮助:
/**
* @return null when waiting on {@code Future<Void>}
*/
@Nullable
public static <T>
T awaitComplete(Future<T> f)
throws Throwable
{
final Object lock = new Object();
final AtomicReference<AsyncResult<T>> resultRef = new AtomicReference<>(null);
synchronized (lock)
{
// We *must* be locked before registering a callback.
// If result is ready, the callback is called immediately!
f.onComplete(
(AsyncResult<T> result) ->
{
resultRef.set(result);
synchronized (lock) {
lock.notify();
}
});
do {
// Nested sync on lock is fine. If we get a spurious wake-up before resultRef is set, we need to
// reacquire the lock, then wait again.
// Ref:
synchronized (lock)
{
// @Blocking
lock.wait();
}
}
while (null == resultRef.get());
}
final AsyncResult<T> result = resultRef.get();
@Nullable
final Throwable t = result.cause();
if (null != t) {
throw t;
}
@Nullable
final T x = result.result();
return x;
}