如何处理列表的每个 Future 的结果以及所有 Future 何时在 vert.x 内完成?

How to handle results for each Future of a List and when all futures are completed in vert.x?

我有一个 ListFuture 并且需要在每个 Future 完成时处理结果。所有 Future 完成后,我需要撰写回复。

我创建期货列表及其处理程序的简化代码:

    final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
    for (final MyObject1 obj1: listOfObject1) {
        final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);

        resp1Future .setHandler(new Handler<AsyncResult<MyResponse1>>() {

            @Override
            public void handle(final AsyncResult<MyResponse1> event) {
                final MyResponse1 resp1= event.result();
                handleResponse1(obj1, resp1);
            }

        });
        futureMyResponse1Lst.add(resp1Future );
    }

在这个阶段,每个 Future 结果都会调用所有处理程序。

但是如果我添加一个 CompositeFuture 只有 CompositeFutureHandler 被调用:

    final List<Future> futuresAll = new ArrayList<>();
    futuresAll.addAll(futureMyResponse1Lst );
    CompositeFuture.all(futuresAll).setHandler(new Handler<AsyncResult<CompositeFuture>>() {

        @Override
        public void handle(final AsyncResult<CompositeFuture> event) {
            // called when all future are completed
            logger.debug("handle end of CompositeFuture and create response");
        }
    });

我找到的所有问题和答案都建议使用 CompositeFuture,但在那种情况下,不会调用 Future 单个处理程序。

如何在每个 Future 结果上调用一个处理程序,并在所有 Future 完成时调用另一个处理程序?我对异步编程和 Vert.x 还很陌生,我使用 CompositeFuture 的方法是否正确?我应该改用 RxJava 吗?怎么样?

您可以创建一个额外的 Future,它将在原始 Futures 处理程序中完成,并将该 Future 用于 CompositeFuture。

final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
    final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);

    Future<MyResponse1> resp1FutureComposite = Future.future();
    resp1Future.setHandler(event-> {
            final MyResponse1 resp1= event.result();
            handleResponse1(obj1, resp1);
            resp1FutureComposite.compelete(resp1);
    });
    futureMyResponse1Lst.add(resp1FutureComposite);
}

如果您只关心成功的调用并且不想处理每个未来的错误(然后在 CompositeFuture 处理程序中处理它)那么您可以改用 compose 方法:

final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
    final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);

    Future<MyResponse1> resp1FutureComposite = resp1Future.compose(resp1 -> {
            handleResponse1(obj1, resp1);
            return Future.succeededFuture(resp1);
    });
    futureMyResponse1Lst.add(resp1FutureComposite);
}