CompletableFuture.allOff 完成,即使其列表中的一个 CompletableFuture 尚未完成
CompletableFuture.allOff completes even if one CompletableFuture in its list is not yet finished
我有 2 个 CompletableFutures。 task2 只应在 task1 完成后开始。然后,我需要等待所有任务完成。在我下面的代码中,程序在 task1 结束后结束。 task2 开始但没有完成。为什么会发生这种情况的任何想法?另外,为什么列表只包含1个条目,而在代码中,我添加了2个?
代码:
public void testFutures () throws Exception {
List<CompletableFuture<Void>> futures = new ArrayList<>();
CompletableFuture<Void> task1 = CompletableFuture.supplyAsync( () -> {
System.out.println(" task1 start");
try {
Thread.sleep(5000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(" task1 done");
return null;
});
task1.whenComplete( (x, y) -> {
CompletableFuture<Void> task2 = CompletableFuture.supplyAsync( () -> {
System.out.println(" task2 start");
try {
Thread.sleep(2000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(" task2 done");
return null;
});
futures.add(task2);
});
futures.add(task1);
// wait for the calls to finish
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete( (x, y) -> {
System.out.println(" all tasks done " + futures.size());
}).get();
} catch (Exception e) {
e.printStackTrace();
}
}
输出:
task1 start
task1 done
all tasks done 1
task2 start
你有两个问题。
首先,您已经创建了关于何时将 task2
添加到您的期货列表的竞争条件。在你执行这一行的时候——
CompletableFuture.allOf(...).get();
—我称之为 终止 getter,列表中只有 task1
。通过输出它的大小自己看看:
// wait for the calls to finish
try {
System.out.println("# of futures: " + futures.size()); // 1
task2
仍然 运行s 最终 ,因为你用 whenComplete()
安排了它。但触发它的不是你的终止 getter。
回想一下,我说过这是一个竞争条件。为了自己演示这一点,在终止 getter 之前添加一个 sleep()
,如下所示:
try {
Thread.sleep(6000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
// wait for the calls to finish
try {
System.out.println("# of futures: " + futures.size()); // 2
那么你已经给了它足够的时间来添加 task2
.
但事情是这样的。 现在是终止getter触发两个任务吗?
还是不行! 这是第二个问题:您几乎总是想使用 thenRun()
、thenAccept()
、thenApply()
之一, thenCompose()
方法。这些方法链接你的未来,即使每个阶段都依赖于前一个阶段,这样你的终止getter 实际上 等待整个链完成。 whenComplete()
是一种启动完全不相关管道的特殊方法,因此不受终止 get()
.
的影响
在你的情况下,你想使用 thenRun()
,像这样:
task1.thenRun( ignore -> {
好的,那么我们如何结合所有这些?
public static void testFutures () throws Exception {
CompletableFuture<Void> task1 = CompletableFuture.supplyAsync( () -> {
System.out.println(" task1 start");
try {
Thread.sleep(5000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(" task1 done");
return null;
});
CompletableFuture<Void> futuresChain = task1.thenRun( () -> {
System.out.println(" task2 start");
try {
Thread.sleep(2000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(" task2 done");
});
// wait for the calls to finish
try {
futuresChain.thenRun( () -> {
System.out.println(" all tasks done ");
}).toCompletableFuture().get();
} catch (Exception e) {
e.printStackTrace();
}
}
输出:
task1 start
task1 done
task2 start
task2 done
all tasks done
你看,第一个任务只需要supplyAsync()
。您希望在该任务之后按顺序 运行 task2
,因此 thenRun()
将为您进行调度(supplyAsync()
ing)。所以你也不需要一系列的期货。 allOf()
用于 运行 并行 任务 ,并等待所有任务完成。
让我们先清理你的代码。
让我们定义一个方法来休眠,这样它就不会把水搅浑:
private static void sleep(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
那我们就把任务分开,用正确的方法:
private static CompletableFuture<Void> task1() {
return CompletableFuture.runAsync(() -> {
System.out.println(" task1 start");
sleep(5);
System.out.println(" task1 done");
});
}
private static CompletableFuture<Void> task2() {
return CompletableFuture.runAsync(() -> {
System.out.println(" task2 start");
sleep(2);
System.out.println(" task2 done");
});
}
您需要了解 CompletableFuture
方法的链接已经完全按照您的要求进行,它们 运行 下一个 阶段,在 上一篇一篇已经结束。您可以通过以下方式使您的代码变得更加简单:
public static void main(String[] args) throws Exception {
testFutures();
}
private static void testFutures() throws Exception {
CompletableFuture<Void> both = task1().thenCompose(ignoreMe -> task2());
both.get();
System.out.println("both done");
}
我有 2 个 CompletableFutures。 task2 只应在 task1 完成后开始。然后,我需要等待所有任务完成。在我下面的代码中,程序在 task1 结束后结束。 task2 开始但没有完成。为什么会发生这种情况的任何想法?另外,为什么列表只包含1个条目,而在代码中,我添加了2个?
代码:
public void testFutures () throws Exception {
List<CompletableFuture<Void>> futures = new ArrayList<>();
CompletableFuture<Void> task1 = CompletableFuture.supplyAsync( () -> {
System.out.println(" task1 start");
try {
Thread.sleep(5000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(" task1 done");
return null;
});
task1.whenComplete( (x, y) -> {
CompletableFuture<Void> task2 = CompletableFuture.supplyAsync( () -> {
System.out.println(" task2 start");
try {
Thread.sleep(2000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(" task2 done");
return null;
});
futures.add(task2);
});
futures.add(task1);
// wait for the calls to finish
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete( (x, y) -> {
System.out.println(" all tasks done " + futures.size());
}).get();
} catch (Exception e) {
e.printStackTrace();
}
}
输出:
task1 start
task1 done
all tasks done 1
task2 start
你有两个问题。
首先,您已经创建了关于何时将 task2
添加到您的期货列表的竞争条件。在你执行这一行的时候——
CompletableFuture.allOf(...).get();
—我称之为 终止 getter,列表中只有 task1
。通过输出它的大小自己看看:
// wait for the calls to finish
try {
System.out.println("# of futures: " + futures.size()); // 1
task2
仍然 运行s 最终 ,因为你用 whenComplete()
安排了它。但触发它的不是你的终止 getter。
回想一下,我说过这是一个竞争条件。为了自己演示这一点,在终止 getter 之前添加一个 sleep()
,如下所示:
try {
Thread.sleep(6000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
// wait for the calls to finish
try {
System.out.println("# of futures: " + futures.size()); // 2
那么你已经给了它足够的时间来添加 task2
.
但事情是这样的。 现在是终止getter触发两个任务吗?
还是不行! 这是第二个问题:您几乎总是想使用 thenRun()
、thenAccept()
、thenApply()
之一, thenCompose()
方法。这些方法链接你的未来,即使每个阶段都依赖于前一个阶段,这样你的终止getter 实际上 等待整个链完成。 whenComplete()
是一种启动完全不相关管道的特殊方法,因此不受终止 get()
.
在你的情况下,你想使用 thenRun()
,像这样:
task1.thenRun( ignore -> {
好的,那么我们如何结合所有这些?
public static void testFutures () throws Exception {
CompletableFuture<Void> task1 = CompletableFuture.supplyAsync( () -> {
System.out.println(" task1 start");
try {
Thread.sleep(5000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(" task1 done");
return null;
});
CompletableFuture<Void> futuresChain = task1.thenRun( () -> {
System.out.println(" task2 start");
try {
Thread.sleep(2000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(" task2 done");
});
// wait for the calls to finish
try {
futuresChain.thenRun( () -> {
System.out.println(" all tasks done ");
}).toCompletableFuture().get();
} catch (Exception e) {
e.printStackTrace();
}
}
输出:
task1 start
task1 done
task2 start
task2 done
all tasks done
你看,第一个任务只需要supplyAsync()
。您希望在该任务之后按顺序 运行 task2
,因此 thenRun()
将为您进行调度(supplyAsync()
ing)。所以你也不需要一系列的期货。 allOf()
用于 运行 并行 任务 ,并等待所有任务完成。
让我们先清理你的代码。
让我们定义一个方法来休眠,这样它就不会把水搅浑:
private static void sleep(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
那我们就把任务分开,用正确的方法:
private static CompletableFuture<Void> task1() {
return CompletableFuture.runAsync(() -> {
System.out.println(" task1 start");
sleep(5);
System.out.println(" task1 done");
});
}
private static CompletableFuture<Void> task2() {
return CompletableFuture.runAsync(() -> {
System.out.println(" task2 start");
sleep(2);
System.out.println(" task2 done");
});
}
您需要了解 CompletableFuture
方法的链接已经完全按照您的要求进行,它们 运行 下一个 阶段,在 上一篇一篇已经结束。您可以通过以下方式使您的代码变得更加简单:
public static void main(String[] args) throws Exception {
testFutures();
}
private static void testFutures() throws Exception {
CompletableFuture<Void> both = task1().thenCompose(ignoreMe -> task2());
both.get();
System.out.println("both done");
}