如何处理列表的每个 Future 的结果以及所有 Future 何时在 vert.x 内完成?
How to handle results for each Future of a List and when all futures are completed in vert.x?
我有一个 List
的 Future
并且需要在每个 Future
完成时处理结果。所有 Future 完成后,我需要撰写回复。
我创建期货列表及其处理程序的简化代码:
final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);
resp1Future .setHandler(new Handler<AsyncResult<MyResponse1>>() {
@Override
public void handle(final AsyncResult<MyResponse1> event) {
final MyResponse1 resp1= event.result();
handleResponse1(obj1, resp1);
}
});
futureMyResponse1Lst.add(resp1Future );
}
在这个阶段,每个 Future 结果都会调用所有处理程序。
但是如果我添加一个 CompositeFuture
只有 CompositeFuture
的 Handler
被调用:
final List<Future> futuresAll = new ArrayList<>();
futuresAll.addAll(futureMyResponse1Lst );
CompositeFuture.all(futuresAll).setHandler(new Handler<AsyncResult<CompositeFuture>>() {
@Override
public void handle(final AsyncResult<CompositeFuture> event) {
// called when all future are completed
logger.debug("handle end of CompositeFuture and create response");
}
});
我找到的所有问题和答案都建议使用 CompositeFuture,但在那种情况下,不会调用 Future 单个处理程序。
如何在每个 Future
结果上调用一个处理程序,并在所有 Future 完成时调用另一个处理程序?我对异步编程和 Vert.x 还很陌生,我使用 CompositeFuture
的方法是否正确?我应该改用 RxJava
吗?怎么样?
您可以创建一个额外的 Future,它将在原始 Futures 处理程序中完成,并将该 Future 用于 CompositeFuture。
final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);
Future<MyResponse1> resp1FutureComposite = Future.future();
resp1Future.setHandler(event-> {
final MyResponse1 resp1= event.result();
handleResponse1(obj1, resp1);
resp1FutureComposite.compelete(resp1);
});
futureMyResponse1Lst.add(resp1FutureComposite);
}
如果您只关心成功的调用并且不想处理每个未来的错误(然后在 CompositeFuture 处理程序中处理它)那么您可以改用 compose 方法:
final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);
Future<MyResponse1> resp1FutureComposite = resp1Future.compose(resp1 -> {
handleResponse1(obj1, resp1);
return Future.succeededFuture(resp1);
});
futureMyResponse1Lst.add(resp1FutureComposite);
}
我有一个 List
的 Future
并且需要在每个 Future
完成时处理结果。所有 Future 完成后,我需要撰写回复。
我创建期货列表及其处理程序的简化代码:
final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);
resp1Future .setHandler(new Handler<AsyncResult<MyResponse1>>() {
@Override
public void handle(final AsyncResult<MyResponse1> event) {
final MyResponse1 resp1= event.result();
handleResponse1(obj1, resp1);
}
});
futureMyResponse1Lst.add(resp1Future );
}
在这个阶段,每个 Future 结果都会调用所有处理程序。
但是如果我添加一个 CompositeFuture
只有 CompositeFuture
的 Handler
被调用:
final List<Future> futuresAll = new ArrayList<>();
futuresAll.addAll(futureMyResponse1Lst );
CompositeFuture.all(futuresAll).setHandler(new Handler<AsyncResult<CompositeFuture>>() {
@Override
public void handle(final AsyncResult<CompositeFuture> event) {
// called when all future are completed
logger.debug("handle end of CompositeFuture and create response");
}
});
我找到的所有问题和答案都建议使用 CompositeFuture,但在那种情况下,不会调用 Future 单个处理程序。
如何在每个 Future
结果上调用一个处理程序,并在所有 Future 完成时调用另一个处理程序?我对异步编程和 Vert.x 还很陌生,我使用 CompositeFuture
的方法是否正确?我应该改用 RxJava
吗?怎么样?
您可以创建一个额外的 Future,它将在原始 Futures 处理程序中完成,并将该 Future 用于 CompositeFuture。
final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);
Future<MyResponse1> resp1FutureComposite = Future.future();
resp1Future.setHandler(event-> {
final MyResponse1 resp1= event.result();
handleResponse1(obj1, resp1);
resp1FutureComposite.compelete(resp1);
});
futureMyResponse1Lst.add(resp1FutureComposite);
}
如果您只关心成功的调用并且不想处理每个未来的错误(然后在 CompositeFuture 处理程序中处理它)那么您可以改用 compose 方法:
final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);
Future<MyResponse1> resp1FutureComposite = resp1Future.compose(resp1 -> {
handleResponse1(obj1, resp1);
return Future.succeededFuture(resp1);
});
futureMyResponse1Lst.add(resp1FutureComposite);
}