Java 流在完成上一张地图之前开始下一张地图

Java stream start next map before finishing previous one

我有2个执行者:

ExecutorService executorDownload = Executors.newFixedThreadPool(n);
ExecutorService executorCalculate = Executors.newFixedThreadPool(m);

首先我需要将任务放入executorDownload,然后在它们完成后将它们放入executorCalculate,然后得到结果。我写了下一个流:

long count = IntStream.range(0, TASK_NUMBER)
            .boxed()
            .parallel()
            .map(i -> executorDownload.submit(new Download(i)))
            .map(future -> calculateResultFuture(executorCalculate, future))
            .filter(Objects::nonNull)
            .filter(Main::isFutureCalculated)
            .count();

public static Future<CalculateResult> calculateResultFuture(ExecutorService executorCalculate, Future<DownloadResult> future) {
    try {
        return executorCalculate.submit(new Calculate(future.get()));
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    return null;
}

public static boolean isFutureCalculated(Future<CalculateResult> future) {
    try {
        return future.get().found;
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    return false;
}

是否可以开始

.map(future -> calculateResultFuture(executorCalculate, future))

之前

.map(i -> executorDownload.submit(new Download(i)))

结束。我需要在第一张地图开始后立即开始第二张地图。

如果我正确理解你的问题,你需要在获得特征结果后才开始第二个 .map()。 您可以添加 .peek(Main::waitFeatureResult) 和另一种方法 waitFeatureResult:

  public static void waitFeatureResult(Future<CalculateResult> future) {
        while (future.isDone()) {
            break;
        }
    }

或编辑您的方法:

public static boolean isFutureCalculated(Future<CalculateResult> future) {
    try {
        while(future.isDone){
          return future.get().found;
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
        return false;
    }       
}

你只需要明白流是一个一个地处理元素,这意味着一个元素在下一个元素进入第一个中间步骤之前遍历整个管道(而不是所有元素一起通过每个步骤,当然除了需要所有元素才能完成其工作的中间步骤)。

这意味着,在您的情况下,第一个元素的 .get() 将阻塞,阻止第二个元素进入第一个任务。

要强制所有元素通过第一次提交(也适用于第二次提交),您需要在开始阻塞之前强制流提交所有任务,例如:

List<Future<DownloadResult>> downloadTasks = IntStream.range(0, TASK_NUMBER)
        .mapToObj(i -> executorDownload.submit(new Download(i)))
        .collect(Collectors.toList());
        //removed .parallel()

这将强制启动所有异步任务,之后您可以对第二个异步批处理执行相同操作:

List<Future<CalculateResult>> calculateResults = downloadTasks.stream()
        .map(future -> calculateResultFuture(executorCalculate, future))
        .filter(Objects::nonNull)
        .collect(Collectors.toList());

那也是,将强制所有任务提交给第二个执行者。从这里,您可以 .get() 而无需进行不必要的等待:

long count = calculateResults.stream()
        .filter(Main::isFutureCalculated)
        .count();

现在,虽然这将消除批次内元素之间不必要的等待,但批次之间仍有可能出现不必要的等待(如果第一个元素完成第一个任务,它将等待所有其余元素完成第一个批次,然后再继续第二批)。为了解决这个问题,您可能需要不同的实现。这是为此设计的可完成期货链:

List<Completable<CalculateResult>> calculateResult = IntStream.range(0, TASK_NUMBER)
     .mapToObj(i -> CompletableFuture.supplyAsync(() -> callDownload(i), executorDownload)
             .thenApplyAsync(downloadResult -> calculateResultFuture(downloadResult), executorCalculate))
     .collect(Collectors.toList());

long count = calculateResult.stream().map(f -> isFutureCalculated(f)).count();

thenApplyAsync 将在第一个任务完成时让第二个任务接管,按元素计算。

当然,这需要您稍微更改 API,以便直接调用下载方法。我使用 callDownload(i) 到 运行 与 new Download(i).call() 相同的逻辑 运行。 calculateResultFuture() 也将更改为删除 Future 参数。