从未来获得结果而不完成所有
Get result from future without completing all
我正在创建 10 个线程并向它们添加 2 种作业,如下所示。
public class ParallelAdder {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<Integer>> list = new ArrayList<Future<Integer>>();
for (int i = 0; i < 10; i++) {
Future<Integer> future;
if (i % 2 == 0) {
future = executor.submit(new Call1());
} else {
future = executor.submit(new Call2());
}
list.add(future);
}
for(Future<Integer> fut : list) {
System.out.println("From future is "+fut.get());
}
}
}
class Call1 implements Callable<Integer> {
public Integer call() throws Exception {
System.out.println("Calling 1");
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}
}
class Call2 implements Callable<Integer> {
public Integer call() throws Exception {
System.out.println("Calling 2");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
}
}
与 Call1 相比,Call2 作业 returns 更快。
在我未来的清单中,我希望工作一完成就得到结果。它不应该依赖于所有其他工作来完成。
这里 Call2 return 正在等待 Call1。如何解决?
问题是您在此处调用阻塞 get
等待:
for(Future<Integer> fut : list) {
System.out.println("From future is "+fut.get());
}
要解决这个问题,您需要使用响应式代码。您可以使用可完成的未来 API,它是为声明式反应式未来设计的 API:
ExecutorService executor = Executors.newFixedThreadPool(10);
Supplier<Integer> call1Supplier = () -> {
System.out.println("Calling 1");
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
};
Supplier<Integer> call2Supplier = () -> {
System.out.println("Calling 1");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
};
然后可以将其提交给同一个执行程序服务,但使用支持回调类对象的反应式 CompletableFuture
。
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(call1Supplier, executor)
.thenAccept(number -> System.out.println("From future is " + number));
futures.add(future);
} else {
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(call2Supplier, executor)
.thenAccept(number -> System.out.println("From future is " + number));
futures.add(future);
}
}
以下只是为了确保当前线程不会在所有异步任务完成之前退出。但如果是长运行的应用,比如服务器,这可能是不必要的
for (CompletableFuture<Void> future : futures) {
future.join();
}
请注意,我内联了 Call1
和 Call2
中的代码,因为 Callable
实现不是必需的。但是把它放在一个单独的 class 中仍然是个好主意(除非函数对象刚刚好)。
我正在创建 10 个线程并向它们添加 2 种作业,如下所示。
public class ParallelAdder {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<Integer>> list = new ArrayList<Future<Integer>>();
for (int i = 0; i < 10; i++) {
Future<Integer> future;
if (i % 2 == 0) {
future = executor.submit(new Call1());
} else {
future = executor.submit(new Call2());
}
list.add(future);
}
for(Future<Integer> fut : list) {
System.out.println("From future is "+fut.get());
}
}
}
class Call1 implements Callable<Integer> {
public Integer call() throws Exception {
System.out.println("Calling 1");
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}
}
class Call2 implements Callable<Integer> {
public Integer call() throws Exception {
System.out.println("Calling 2");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
}
}
与 Call1 相比,Call2 作业 returns 更快。
在我未来的清单中,我希望工作一完成就得到结果。它不应该依赖于所有其他工作来完成。
这里 Call2 return 正在等待 Call1。如何解决?
问题是您在此处调用阻塞 get
等待:
for(Future<Integer> fut : list) {
System.out.println("From future is "+fut.get());
}
要解决这个问题,您需要使用响应式代码。您可以使用可完成的未来 API,它是为声明式反应式未来设计的 API:
ExecutorService executor = Executors.newFixedThreadPool(10);
Supplier<Integer> call1Supplier = () -> {
System.out.println("Calling 1");
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
};
Supplier<Integer> call2Supplier = () -> {
System.out.println("Calling 1");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
};
然后可以将其提交给同一个执行程序服务,但使用支持回调类对象的反应式 CompletableFuture
。
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(call1Supplier, executor)
.thenAccept(number -> System.out.println("From future is " + number));
futures.add(future);
} else {
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(call2Supplier, executor)
.thenAccept(number -> System.out.println("From future is " + number));
futures.add(future);
}
}
以下只是为了确保当前线程不会在所有异步任务完成之前退出。但如果是长运行的应用,比如服务器,这可能是不必要的
for (CompletableFuture<Void> future : futures) {
future.join();
}
请注意,我内联了 Call1
和 Call2
中的代码,因为 Callable
实现不是必需的。但是把它放在一个单独的 class 中仍然是个好主意(除非函数对象刚刚好)。