在 Java 中链接可变数量的承诺 (CompletableFuture)

Chaining a variable number of promises (CompletableFuture) in Java

给定函数列表,其中 Context 是一些自定义类型:

List<Function<Context, CompletableFuture<Context>>> preprocessors;

我需要 CompletableFuture<Context> 执行每一个函数,以便将每次迭代的异步结果提供给下一次迭代。因此,对于列表中包含 3 个元素的以下语句,我需要一个通用解决方案(对于可变列表大小):

Context context;

CompletableFuture<Context> promise = preprocessors.get(0).apply(context)
    .thenCompose((c) -> preprocessors.get(1).apply(c))
    .thenCompose((c) -> preprocessors.get(2).apply(c));

有什么想法吗?

为了去掉索引,上面的代码也可以是这样的:

Iterator<Function<Context, CompletableFuture<Context>>> it = preprecessors.iterator();

Context context;

CompletableFuture<Context> promise = it.next().apply(context)
    .thenCompose((c) -> it.next().apply(c))
    .thenCompose((c) -> it.next().apply(c));

不过,我如何将其进一步归纳为可变元素计数?

也许像 RxJava 这样的东西可能有用:

    Flowable.fromIterable(preprocessors)
            .map(function -> function.apply(context))
            .flatMap(Flowable::fromFuture)
            .subscribe(c -> {
                // Whatever it is you want to do with the context that is returned
            });

虽然我必须承认我不完全确定这是否会确保所有期货的顺序执行

https://github.com/ReactiveX/RxJava

调查 reduce() 似乎很难解决对第一个元素的特殊处理,并且无法保证顺序处理。使用常规 Java 你可以写

List<Function<Context, CompletableFuture<Context>>> preprocessors;
Context context;
Iterator<Function<Context, CompletableFuture<Context>>> itr = preprocessors.iterator();

CompletableFuture<Context> promise = itr.next().apply(context);
while(itr.hasNext())
    promise = promise.thenCompose(c -> itr.next().apply(c));

假设您只对最后一个 CompleteableFuture(承诺)的结果感兴趣,那么您可以获取每个迭代承诺并根据它进行组合。

采用索引或 foreach 循环,您可以使用以下代码片段来达到您的目的:

CompleteableFuture<Context> promise = CompleteableFuture.completedFuture(context);
for(Function<Context, CompletableFuture<Context>> preprocessor: preprocessors) {
  promise = promise.thenCompose(ctx -> preprocessor.apply(ctx));
}

我想我是这样用 reduce 做到的:

        CompletableFuture<Context> res =
            preprocessors.stream()
                         .reduce(CompletableFuture.completedFuture(context),
                                 (future, processor) -> future.thenCompose(processor::apply),
                                 (old, current) -> current);

累加器(第二个参数)接收未来和处理器并产生下一个未来。所以组合器(第三个参数)可以安全地丢弃 "old" 未来而只是 return 新的。

未经测试!它可能不像宣传的那样有效:)