使用 CompletableFuture.supplyAsync 返回多个值

Returning multiple values with CompletableFuture.supplyAsync

我正在编写一个程序来从源下载历史报价。源每天通过 http 提供需要解析和处理的文件。该程序使用不同阶段的 CompletableFuture 并行下载多个文件。第一阶段是使用 HttpClient 进行 Http 调用并获得响应。

getHttpResponse() 方法 return 是一个 CloseableHttpResponse 对象。我还想 return 一个 url ,为此提出了这个 http 请求。最简单的方法是拥有一个包含这两个字段的包装器对象,但我觉得只包含这两个字段的 class 太过分了。 CompletableFuture 或 Streams 有什么方法可以实现这一点吗?

  filesToDownload.stream()
                 .map(url -> CompletableFuture.supplyAsync(() -> this.getHttpResponse(url), this.executor) )
                 .map(httpResponseFuture -> httpResponseFuture.thenAccept(t -> processHttpResponse(t)))
                 .count();

不清楚为什么要不惜一切代价引入 Stream API。将 CompletableFuture 使用拆分为两个 map 操作会导致问题,否则不会存在。除此之外,使用 map 作为副作用是对 Stream API 的滥用。如果 filesToDownload 是一个已知大小的流源(就像几乎每个集合),这可能会在 Java 9 中完全中断。然后,count() 将简单地 return 已知大小,而不处理 map 操作的功能......

如果你想将 URLCloseableHttpResponse 传递给 processHttpResponse,你可以很简单地做到这一点:

filesToDownload.forEach(url ->
    CompletableFuture.supplyAsync(() -> this.getHttpResponse(url), this.executor)
                     .thenAccept(  t -> processHttpResponse(t, url))
);

即使,如果您使用 Stream API 来收集结果,也没有理由将 CompletableFuture 拆分为多个 map 操作:

List<…> result = filesToDownload.stream()
  .map(url -> CompletableFuture.supplyAsync(() -> this.getHttpResponse(url), this.executor)
                               .thenApply(   t -> processHttpResponse(t, url))  )
  .collect(Collectors.toList())
  .stream()
  .map(CompletableFuture::join)
  .collect(Collectors.toList());

请注意,在等待第二个 Stream 操作的任何结果之前,这会将 CompletableFuture 收集到 List 中。这比使用并行 Stream 操作更可取,因为它确保 所有 异步操作已提交,然后开始等待。

使用单个 Stream 管道意味着在提交第二个作业之前等待第一个作业完成,使用并行 Stream 只会减少而不是解决问题。这将取决于 Stream 实现的执行策略(默认 Fork/Join 池),这会干扰您指定的执行程序的实际策略。例如,如果指定的执行程序应该使用比 CPU 个内核更多的线程,则 Stream 仍然会一次提交与内核一样多的作业——如果在默认 Fork/Join池.

相比之下,上述解决方案的行为将完全由指定执行程序的执行策略控制。