如何在链式 CompletableFuture 中扇出?

How to Fan Out Inside Chained CompletableFuture?

我想链接一个 CompletableFuture,使其在处理过程中散开。我的意思是我有一个针对列表的开放 CompletableFuture,我想对该列表中的每个项目应用计算。

第一步是调用 m_myApi.getResponse(request, executor) 发出异步调用。

该异步调用的结果有一个 getCandidates 方法。我想并行解析所有这些候选人。

目前,我的代码对它们进行串行解析

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .map(MyParser::ParseCandidates)
                                                   .collect(Collectors.toList()));
}

我想要这样的东西:

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .PARSE_IN_PARALLEL_USING_EXECUTOR
}

已经有一个 thenApply 的版本将 Executor 作为附加参数。

<U> CompletionStage<U>  thenApplyAsync​(Function<? super T,​? extends U> fn, Executor executor)

如果你在那里传递一个 forkjoin 执行器,那么 parallel stream inside the lambda will use the passed executor instead of the common pool

中所述,如果 Executor 恰好是一个 Fork/Join 池,则有一个(未记录的)功能在其工作线程之一中启动并行流将使用该执行器执行并行操作。

当你想支持任意 Executor 实现时,事情就更复杂了。一种解决方案看起来像

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(
       @Nonnull final REQUEST request, @Nonnull final Executor executor)
{
    CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
    return candidates.thenComposeAsync(
        response -> {
            List<CompletableFuture<DOMAIN_OBJECT>> list = response.getCandidates()
                .stream()
                .map(CompletableFuture::completedFuture)
                .map(f -> f.thenApplyAsync(MyParser::ParseCandidates, executor))
                .collect(Collectors.toList());
            return CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
                .thenApplyAsync(x ->
                    list.stream().map(CompletableFuture::join).collect(Collectors.toList()),
                    executor);
        },
        executor);
}

第一件关键的事情是我们必须在开始等待之前提交所有潜在的异步作业,以启用执行程序可能支持的最大并行度。因此,我们必须在第一步中收集 List 中的所有期货。

在第二步中,我们可以迭代列表和 join 所有期货。如果执行器是一个 Fork/Join 池并且未来尚未完成,它会检测到这一点并启动补偿线程以重新获得配置的并行度。但是,对于任意个执行者,我们不能假设这样的特征。最值得注意的是,如果执行器是单线程执行器,这可能会导致死锁。

因此,该解决方案使用CompletableFuture.allOf仅在所有期货都已完成时才执行迭代和加入所有期货的操作。因此,此解决方案永远不会阻塞执行程序的线程,使其与任何 Executor 实现兼容。