Java 8 运行 多个并行方法

Java 8 Running Multiple Methods In Parallel

我有 2 个方法,它们具有不同的 return 类型,我想同时 运行。这是我的代码:

public void method(int id) {
    final CompletableFuture<List<FooA>> fooACF = CompletableFuture.supplyAsync(() -> generateFooA(id));
    final CompletableFuture<List<FooB>> fooBCF = CompletableFuture.supplyAsync(() -> generateFooB(id));
    List<FooA> fooAs = fooACF.get();
    List<FooB> fooBs = fooBCF.get();
    //Do more processesing
}

public List<FooA> generateFooA(int id) {
    //code
}

public List<FooB> generateFooB(int id) {
    //code
}

但我不知道这两种方法是否会 运行 与上面的代码并行,或者我是否最好说:

List<FooA> fooAs = generateFooA(id);
List<FooB> fooBs = generateFooB(id);

如何正确使用可完成的期货才能运行这两种方法并行?

正如我在评论中所说,查看 How to start two threads at "exactly" the same time 但这应该是您要找的东西

final CyclicBarrier gate = new CyclicBarrier(3);
public void method(int id) {
    Thread one = new Thread (()->{
        gate.await();
        List<FooA> fooAs = generateFooA(id);
    });
    Thread two = new Thread (()->{
        gate.await();
        List<FooB> fooBs = generateFooB(id);
    });
    one.start();
    two.start();
    gate.await();
    //Do more processesing
}

public List<FooA> generateFooA(int id) {
    //code
}

public List<FooB> generateFooB(int id) {
    //code
}

您缺少 Executor:

ExecutorService executor = Executors.newCachedThreadPool();
List<Future<?>> = Stream.<Runnable>of(() -> generateFooA(id), () -> generateFooA(id))
        .map(executor::submit)
        .collect(Collectors.toList());
for (Future<?> future : futures) {
    future.get(); // do whatever you need here
}

当你submit他们时,Runnables开始执行。 get() return 尽快。例如,如果您 get() 的第一个未来是最慢的,所有其他 get() 调用将立即 return。

您的代码工作正常,使用 ForkJoinPool.commonPool() 提供的线程,正如 JavaDoc 为 CompletableFuture.supplyAsync(Supplier<U> supplier) 所承诺的那样。您可以通过添加一些 sleep()println() 语句以快速而简单的方式证明它。我通过使用 String 而不是 List<Foo>:

稍微简化了您的代码
public void method(int id) throws InterruptedException, ExecutionException {
    CompletableFuture<String> cfa = CompletableFuture.supplyAsync(() -> generateA(id));
    CompletableFuture<String> cfb = CompletableFuture.supplyAsync(() -> generateB(id));
    String fooA = cfa.get();
    String fooB = cfb.get();
    System.out.println("Final fooA " + fooA);
    System.out.println("Final fooB " + fooB);
}

public String generateA(int id) {
    System.out.println("Entering generateA " + Thread.currentThread());
    sleep(2000);
    System.out.println("Leaving generateA");
    return "A" + id;
}

public String generateB(int id) {
    System.out.println("Entering generateB " + Thread.currentThread());
    sleep(1000);
    System.out.println("Leaving generateB");
    return "B" + id;
}

private void sleep(int n) {
    try {
        Thread.sleep(n);
    } catch (InterruptedException ex) {
        // never mind
    }
}

输出为:

Entering generateFooA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Entering generateFooB Thread[ForkJoinPool.commonPool-worker-2,5,main]
Leaving generateFooB
Leaving generateFooA
Final fooA A1
Final fooB B1

可以手动观察1秒2秒后出现"Leaving"输出行。如需更多证据,您可以在输出中添加时间戳。如果您更改睡眠的相对长度,您会看到 "Leaving" 输出以不同的顺序出现。


如果您省略 sleep()s,那么第一个线程很可能会很快完成以致于在第二个线程开始之前就已完成:

Entering generateA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateA
Entering generateB Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateB
Final fooA A1
Final fooB B1

请注意,这一切发生得如此之快,以至于在运行时请求第二个线程时线程已返回到池中。所以原来的线程被复用了。

这可能也发生在非常短的睡眠中,尽管在我的系统上每次 运行 睡眠 1 毫秒就足够了。当然 sleep() 是需要时间才能完成的 "real" 操作的占位符。如果您的实际操作非常便宜,以至于它在另一个线程启动之前就完成了,那么这是一个很好的提示,表明在这种情况下多线程是无益的。


但是如果你需要问如何证明事情是同时发生的,我想知道你为什么要让它们同时发生。如果您的程序在并发或顺序执行这些任务时没有 "real world" 可观察到的差异,那么为什么不按顺序保留它 运行 呢?更容易推理顺序操作;有很多与并发相关的偷偷摸摸的错误。

也许您希望通过多线程提高速度——如果是这样的话,速度的提高应该是您要测量的,而不是事情是否实际上是并发的。请记住,对于大量任务,CPU 并行执行它们的速度不会比顺序执行的快。