可完成的未来 | thenApply 与 thenCompose

CompletableFuture | thenApply vs thenCompose

我无法理解 thenApplythenCompose 之间的区别。

那么,有人可以提供一个有效的用例吗?

来自 Java 文档:

thenApply(Function<? super T,? extends U> fn)

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.

thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function.

我知道 thenCompose 的第二个参数扩展了 thenApply 没有的 CompletionStage。

有人可以提供一个示例,在这种情况下我必须使用 thenApply 以及什么时候使用 thenCompose

thenApply如果你有同步映射功能就用

CompletableFuture<Integer> future = 
    CompletableFuture.supplyAsync(() -> 1)
                     .thenApply(x -> x+1);

thenCompose 用于异步映射函数(即 return 是 CompletableFuture 的映射函数)。然后它将 return 一个直接带有结果的未来,而不是一个嵌套的未来。

CompletableFuture<Integer> future = 
    CompletableFuture.supplyAsync(() -> 1)
                     .thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));

Java9 中更新的 Java文档可能有助于更好地理解它:

thenApply

<U> CompletionStage<U> thenApply​(Function<? super T,? extends U> fn)

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.

This method is analogous to Optional.map and Stream.map.

See the CompletionStage documentation for rules covering exceptional completion.

thenCompose

<U> CompletionStage<U> thenCompose​(Function<? super T,? extends CompletionStage<U>> fn)

Returns a new CompletionStage that is completed with the same value as the CompletionStage returned by the given function.

When this stage completes normally, the given function is invoked with this stage's result as the argument, returning another CompletionStage. When that stage completes normally, the CompletionStage returned by this method is completed with the same value.

To ensure progress, the supplied function must arrange eventual completion of its result.

This method is analogous to Optional.flatMap and Stream.flatMap.

See the CompletionStage documentation for rules covering exceptional completion.

我认为@Joe C 发布的回答具有误导性。

让我试着用一个例子来解释thenApplythenCompose之间的区别。

假设我们有 2 种方法:getUserInfo(int userId)getUserRating(UserInfo userInfo):

public CompletableFuture<UserInfo> getUserInfo(userId)

public CompletableFuture<UserRating> getUserRating(UserInfo)

两种方法 return 类型都是 CompletableFuture

我们想先调用 getUserInfo(),完成后调用 getUserRating() 并得到 UserInfo

完成getUserInfo()方法后,让我们尝试thenApplythenCompose。区别在于 return 类型:

CompletableFuture<CompletableFuture<UserRating>> f =
    userInfo.thenApply(this::getUserRating);

CompletableFuture<UserRating> relevanceFuture =
    userInfo.thenCompose(this::getUserRating);

thenCompose() 的工作方式与 Scala's flatMap 类似,它会压平嵌套的期货。

thenApply() return 将嵌套的 futures 按原样编辑,但 thenCompose() 将嵌套的 CompletableFutures 展平,以便更容易地链接更多方法调用。

thenApplythenComposeCompletableFuture的方法。当你打算用 Function.

CompletableFuture 的结果做些什么时使用它们

thenApplythenCompose 都是 return 和 CompletableFuture 作为他们自己的结果。您可以将多个 thenApplythenCompose 链接在一起。为每个调用提供一个 Function,其结果将是下一个 Function.

的输入

您提供的 Function 有时需要同步执行某些操作。您的 Function 的 return 类型应该是非 Future 类型。在这种情况下,您应该使用 thenApply.

CompletableFuture.completedFuture(1)
    .thenApply((x)->x+1) // adding one to the result synchronously, returns int
    .thenApply((y)->System.println(y)); // value of y is 1 + 1 = 2

其他时候您可能希望在此 Function 中进行异步处理。在这种情况下,您应该使用 thenCompose。您的 Function 的 return 类型应该是 CompletionStage。链中的下一个 Function 将获得 CompletionStage 的结果作为输入,从而展开 CompletionStage.

// addOneAsync may be implemented by using another thread, or calling a remote method
abstract CompletableFuture<Integer> addOneAsync(int input);

CompletableFuture.completedFuture(1)
    .thenCompose((x)->addOneAsync(x)) // doing something asynchronous, returns CompletableFuture<Integer>
    .thenApply((y)->System.println(y)); // y is an Integer, the result of CompletableFuture<Integer> above

这与 Java 脚本的 Promise 类似。 Promise.then 可以接受 return 值或 Promise 值的函数。这两个方法之所以在Java中有不同的名称,是因为generic erasureFunction<? super T,? extends U> fnFunction<? super T,? extends CompletionStage<U>> fn 被认为是相同的运行时类型 - Function。因此 thenApplythenCompose 必须明确命名,否则 Java 编译器会抱怨相同的方法签名。最终结果是,Java脚本的 Promise.then 分两部分实现 - thenApplythenCompose - 在 Java.

如果您也对相关功能感到困惑,您可以阅读 thenApplyAsync

thenCompose() 更适合链接 CompletableFuture。

thenApply() 更适合 Completable future 的转换结果。

您可以使用这两种技术来实现您的目标,但其中一种比另一种更适合一种用例。

public CompletableFuture<Integer> process(Integer i) {
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(
            () -> new HeavyTask(i).execute());
    return completableFuture;
}

@SneakyThrows
public CompletableFuture<Integer> thenApplyVsThenCompose() {
    // each time calling thenApply() function returns CompletionState
    // so you will get nested Futures 
    // you can think about it like map() java optional
    CompletableFuture<Future<Integer>> cf1 = CompletableFuture.supplyAsync(
            () -> new HeavyTask().execute())
            .thenApply(i -> process(i));

    // to get result you will have to get nested get() calls
    Integer resultFromThenApply = cf1.get().get();

    // when calling thenCompose() nested Futures are flatten
    // you can think about it like flatMap() java optional
    CompletableFuture<Integer> cf2;
    cf2 = CompletableFuture.supplyAsync(
            () -> new HeavyTask().execute())
            .thenCompose(this::process);

    // you have to just call one get() since thenCompose was flatten
    Integer resultFromThenCompose = cf2.get();
    return null;
} 

可以可视化这两者之间差异的其他问题

当您不知道必须应用多少次 thenApply()/thenCompose()(例如递归方法)时,您将如何实施解决方案?

public void nested() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> completableFutureToCompose = CompletableFuture.completedFuture(1);
    for (int i = 0; i < 10; i++) {
        log.info("Composing");
        completableFutureToCompose = completableFutureToCompose.thenCompose(this::process);
    }
    completableFutureToCompose.get();

    // not achievable using then apply
    CompletableFuture<Integer> completableFutureToApply = CompletableFuture.completedFuture(1);
    for (int i = 0; i < 10; i++) {
        log.info("Applying");
        completableFutureToApply = completableFutureToApply.thenApply(this::process).get();
    }
    completableFutureToCompose.get();
}

public CompletableFuture<Integer> process(Integer i) {
    log.info("PROCESSING");
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(
            () -> new HeavyTask(i).execute());
    return completableFuture;
}
  • 使用 composing 你首先创建 receipe 如何将 futures 传递给另一个然后执行
  • 使用 apply 可以在每次 apply 调用后执行逻辑

的答案是正确的,但我认为可以明确 thenCompose 目的的更好比较是 thenApply 之间的比较然后申请!将同步映射传递给它一次,将异步映射传递给它一次。

如果映射传递给thenApply returns一个String(一个非future,所以映射是同步的),则其结果将是 CompletableFuture<String>。现在类似地,当映射传递给 returns a CompletableFuture>(未来,所以映射是异步的)?最终结果将是 CompletableFuture<CompletableFuture<String>>,这是不必要的嵌套(未来的未来仍然是未来!)。在这里我们可以使用 thenCompose 来“组合”(嵌套)多个异步任务,而无需在结果中嵌套 futures。

private void test1() 抛出 ExecutionException、InterruptedException {

    //thenApply返回的是之前的CompletableFuture
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1)
            .thenApply((x) -> {
                x = x + 1;
                log.info("thenApply, 1, x:{}", x);
                return x;
            });

    System.out.println(future.get());
}

//thenCompose返回的是新的CompletableFuture
private void test2() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1)
            .thenCompose((x) -> {
                return CompletableFuture.supplyAsync(() -> {
                    Integer y = x + 1;
                    log.info("thenCompose, 1, x:{}", y);
                    return y;
                });
            });

    System.out.println(future.get());
}

如果您仍然对我使用 thenApply 与 thenCompose 时代码的真正区别以及嵌套的未来是什么样子感到困惑,那么请查看完整的工作示例。

package com.graphql.demo.garphqlapi;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class ComposeVsThenApply {

    public static void main(String[] args) {
        ComposeVsThenApply cva = new ComposeVsThenApply();
        //thenCompose usage : Using the thenCompose for simplifying the return type of dependent processing.
        System.out.println("Starting thenCompose demo");
        CompletableFuture<StockRating> flattenedFuture = cva.getStockDetails("Apple").thenCompose((stock) -> {
            return cva.getRating(stock);
        });
        //Retrive results
        try {
            StockRating stockViaThenCompose = flattenedFuture.get();
            System.out.println("\n\t\tStock summery :" + stockViaThenCompose.getStockName() + ", Rating :" + stockViaThenCompose.getRating());
            System.out.println("--- thenCompose demo ended ---");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        //ThenAply: thenApply is good for result transformation but sucks when we have two asynchronous dependent processing. Now you get nested future.
        System.out.println("\n\n\nStarting thenApply demo");
        CompletableFuture<CompletableFuture<StockRating>> nestedFuture = cva.getStockDetails("Apple").thenApply((stock) -> {
            return cva.getRating(stock);
        });
        //Retrive results
        try {
            StockRating stockrViaThenApply = nestedFuture.get().get();
            System.out.println("\n\t\tStock summery :" + stockrViaThenApply.getStockName() + ", Rating :" + stockrViaThenApply.getRating());
            System.out.println("--- thenApply demo ended---");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    class Stock {
        private String name;

        public Stock(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }

    class StockRating {
        private Double rating;

        public boolean isBuyCall() {
            return buyCall;
        }

        public void setBuyCall(boolean buyCall) {
            this.buyCall = buyCall;
        }

        public String getStockName() {
            return stockName;
        }

        public void setStockName(String stockName) {
            this.stockName = stockName;
        }

        private boolean buyCall;

        public StockRating(Double rating, boolean buyCall, String stockName) {
            this.rating = rating;
            this.buyCall = buyCall;
            this.stockName = stockName;
        }

        private String stockName;

        public StockRating(Double rating) {
            this.rating = rating;
        }

        public Double getRating() {
            return rating;
        }

        public void setRating(Double rating) {
            this.rating = rating;
        }
    }

    class StockSupplier implements Supplier<Stock> {
        private String name;

        public StockSupplier(String name) {
            this.name = name;
        }

        @Override
        public Stock get() {
            try {
                System.out.println("\n\t\tRetriving details for " + this.name);
                TimeUnit.SECONDS.sleep(4);
                System.out.println("\n\t\tDone with details retrival for " + this.name);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Stock(name);
        }
    }

    class RatingSupplier implements Supplier<StockRating> {
        private Stock stock;

        public RatingSupplier(Stock stock) {
            this.stock = stock;
        }

        @Override
        public StockRating get() {
            try {
                System.out.println("\n\t\tRetriving stock rating for " + this.stock.getName());
                TimeUnit.SECONDS.sleep(4);
                System.out.println("\n\t\tDone with rating retrival for " + this.stock.getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new StockRating(10d, true, this.stock.getName());
        }
    }

    public CompletableFuture<Stock> getStockDetails(String name) {
        return CompletableFuture.supplyAsync(new StockSupplier(name));
    }

    public CompletableFuture<StockRating> getRating(Stock stock) {
        return CompletableFuture.supplyAsync(new RatingSupplier(stock));
    }

    public String getSummeryReport(Stock stock, StockRating sr) {
        return stock.getName() + "/n " + sr.getRating();
    }
}

我的理解是,通过上一步的结果,如果要进行复杂的编排,thenCompose会比thenApply更有优势。

下面的例子是,通过第一步的结果,去两个不同的地方计算,谁returns快,可以看出两者的区别

    CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> 1);
    // thenCompose
    System.out.println(result.thenCompose(i -> CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> i + 1), CompletableFuture.supplyAsync(() -> i + 2))).join());
    System.out.println(result.thenCompose(i -> CompletableFuture.supplyAsync(() -> i + 1).applyToEither(CompletableFuture.supplyAsync(() -> i + 2), j -> j)).join());
    // ----- thenApply
    System.out.println(result.thenApply(i -> CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> i + 1), CompletableFuture.supplyAsync(() -> i + 2)).join()).join());
    System.out.println(result.thenApply(i -> CompletableFuture.supplyAsync(() -> i + 1).applyToEither(CompletableFuture.supplyAsync(() -> i + 2), j -> j).join()).join());