CompletableFuture 的实例无法获得预期结果

instance of CompletableFuture cannot get expected result

    CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("enter into completableFuture()");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("start to out of completableFuture()");
        return "a";
    });

    System.out.println("do something else");

    cf1.thenApply(v -> v + " b").thenAcceptAsync(v ->
            System.out.println(v)
    );

    System.out.println("finalize...");

    //cannot get expected result, if this line was comment out.
    //TimeUnit.SECONDS.sleep(10);

代码如上。

在写jdk8中使用CompletableFuture的例子时,一头雾水

我必须添加最后一行

TimeUnit.SECONDS.sleep(10);

得到预期的结果。

如果不让主线程休眠,想知道程序是否已经结束。如果不是,为什么我无法得到输出?

非常感谢您的宝贵时间。

您可以暂停 CompletableFuture 直到 CompletableFuture#join 完成,例如:

CompletableFuture<Void> stage = cf1.thenApply(v -> v + " b").thenAcceptAsync(v ->
        System.out.println(v)
);

System.out.println("finalize...");

//   v--- the main thread wait until the stage is completed
stage.join();

JVM 在没有非守护线程时终止 运行,因此如果异步操作仅由守护线程执行,它会在主线程终止时终止,不会继续后台操作。

有几种方法可以解决这个问题。

  1. 如果后台计算形成一个单一的依赖链,你可以使用最后一个操作等待它完成,因为它的完成意味着前面所有阶段的完成。让主线程等待直到完成将 JVM 的终止推迟到该点:

    CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("enter into completableFuture()");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        System.out.println("start to out of completableFuture()");
        return "a";
    });
    System.out.println("do something else");
    CompletableFuture last
        = cf1.thenApply(v -> v + " b").thenAcceptAsync(System.out::println);
    System.out.println("finalize...");
    last.join();
    
  2. 考虑 documentation of CompletableFuture:

    All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).

    因为 F/J 公共池的 属性 使用守护进程线程,我们可以使用该知识等待所有挂起任务的完成,在这种情况下,它独立于这些未决任务之间的依赖关系:

    CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("enter into completableFuture()");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        System.out.println("start to out of completableFuture()");
        return "a";
    });
    System.out.println("do something else");
    cf1.thenApply(v -> v + " b").thenAcceptAsync(System.out::println);
    System.out.println("finalize...");
    if(ForkJoinPool.getCommonPoolParallelism()>1)
        ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
    
  3. 使用显式执行器,不会使用守护线程。 JRE提供的线程池executor,暂且不说ForkJoinPool,默认使用非daemon线程:

    final ExecutorService threadPool = Executors.newCachedThreadPool();
    CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("enter into completableFuture()");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        System.out.println("start to out of completableFuture()");
        return "a";
    }, threadPool);
    System.out.println("do something else");
    cf1.thenApply(v -> v + " b").thenAcceptAsync(System.out::println);
    System.out.println("finalize...");
    threadPool.shutdown();
    

    请注意,threadPool.shutdown(); 并不意味着等待,也不意味着停止挂起的任务;它只会停止接受新任务,并确保一旦处理完所有未决任务,池线程就会终止。您可以在将它与 supplyAsync 一起使用后直接放置它,而不改变行为。

    因此,第三种解决方案是唯一让 main 线程退出的解决方案,JVM 会继续运行,直到处理完所有待处理的后台任务,因为它们在非守护线程中 运行。