仅在满足条件时链接多个 CompletionStage

Chaining several CompletionStage only if a condition is achieved

我想链接几个 CompletionStage 方法。问题是第一个的结果将决定是否应该执行下一个。现在实现这一目标的唯一方法似乎是将 "special" 参数传递给 next CompletionStage,这样它就不会执行完整的代码。例如:

public enum SomeResult {
    RESULT_1,
    RESULT_2,
    RESULT_3
}

public CompletionStage<SomeResult> someMethod(SomeArgument someArgument) {

    return CompletableFuture.supplyAsync(() -> {
        // loooooong operation
        if (someCondition)
            return validValue;
        else
            return null;
    }).thenCompose(result -> {
        if (result != null)
            return someMethodThatReturnsACompletionStage(result);
        else
            return CompletableFuture.completedFuture(null);
    }).thenApply(result -> {
        if (result == null)
            return ChainingResult.RESULT_1;
        else if (result.someCondition())
            return ChainingResult.RESULT_2;
        else
            return ChainingResult.RESULT_3;
    });
}

因为整个代码依赖于第一个 someCondition(如果它是 false 那么结果将是 RESULT_1,如果不是那么整个代码应该被执行)这个构造看起来对我来说有点丑。有什么方法可以决定是否应该执行第二个(thenCompose(...))和第三个(thenApply(...))方法?

你可以这样做:

public CompletionStage<SomeResult> someMethod(SomeArgument someArgument) {
    CompletableFuture<SomeResult> shortCut = new CompletableFuture<>();
    CompletableFuture<ResultOfFirstOp> withChain = new CompletableFuture<>();

    CompletableFuture.runAsync(() -> {
        // loooooong operation
        if (someCondition)
            withChain.complete(validValue);
        else
            shortCut.complete(SomeResult.RESULT_1);
    });
    return withChain
        .thenCompose(result -> someMethodThatReturnsACompletionStage(result))
        .thenApply(result ->
                   result.someCondition()? SomeResult.RESULT_2: SomeResult.RESULT_3)
        .applyToEither(shortCut, Function.identity());
}

我们创建了两个,而不是一个 CompletableFuture,代表我们可能采用的不同执行路径。然后将 loooooong 操作提交为可运行,并将故意完成其中之一 CompletableFuture。后续阶段链接到表示已满足条件的阶段,然后两个执行路径在最后 applyToEither(shortCut, Function.identity()) 步加入。

shortCut 未来已经有了最终结果的类型,将与 RESULT_1 一起完成,null 传递路径的结果,这将导致立即完成整个操作。如果你不喜欢快捷方式第一阶段和实际结果值之间的依赖关系,你可以这样收回:

public CompletionStage<SomeResult> someMethod(SomeArgument someArgument) {
    CompletableFuture<Object> shortCut = new CompletableFuture<>();
    CompletableFuture<ResultOfFirstOp> withChain = new CompletableFuture<>();

    CompletableFuture.runAsync(() -> {
        // loooooong operation
        if (someCondition)
            withChain.complete(validValue);
        else
            shortCut.complete(null);
    });
    return withChain
        .thenCompose(result -> someMethodThatReturnsACompletionStage(result))
        .thenApply(result ->
                   result.someCondition()? SomeResult.RESULT_2: SomeResult.RESULT_3)
        .applyToEither(shortCut.thenApply(x -> SomeResult.RESULT_1), Function.identity());
}

如果您的第三步不是典型的但看起来与问题中显示的完全一样,您可以将其与代码路径连接步骤合并:

public CompletionStage<SomeResult> someMethod(SomeArgument someArgument) {
    CompletableFuture<ResultOfSecondOp> shortCut = new CompletableFuture<>();
    CompletableFuture<ResultOfFirstOp> withChain = new CompletableFuture<>();

    CompletableFuture.runAsync(() -> {
        // loooooong operation
        if (someCondition)
            withChain.complete(validValue);
        else
            shortCut.complete(null);
    });
    return withChain
        .thenCompose(result -> someMethodThatReturnsACompletionStage(result))
        .applyToEither(shortCut, result -> result==null? SomeResult.RESULT_1:
            result.someCondition()? SomeResult.RESULT_2: SomeResult.RESULT_3);
}

然后我们只跳过第二步,即 someMethodThatReturnsACompletionStage 调用,但它仍然可以代表一长串中间步骤,无需通过 nullcheck 手动跳过所有步骤。

为了完整起见,我添加了一个新答案

虽然@Holger 提出的解决方案效果很好,但对我来说有点奇怪。我一直在使用的解决方案包括在不同的方法调用中分离不同的流程,并将它们与 thenCompose 链接起来:

public enum SomeResult {
    RESULT_1,
    RESULT_2,
    RESULT_3
}

public CompletionStage<SomeResult> someMethod(SomeArgument someArgument) {

    return CompletableFuture.supplyAsync(() -> {
        // loooooong operation
        if (someCondition)
            return operateWithValidValue(value);
        else
            return CompletableFuture.completedValue(ChainingResult.RESULT_1);
    })
        .thenCompose(future -> future);

public CompletionStage<SomeResult> operateWithValidValue(... value) {
     // more loooong operations...
     if (someCondition)
         return CompletableFuture.completedValue(SomeResult.RESULT_2);
     else
         return doFinalOperation(someOtherValue);   
}

public CompletionStage<SomeResult> doFinalOperation(... value) {
     // more loooong operations...
     if (someCondition)
         return CompletableFuture.completedValue(SomeResult.RESULT_2);
     else
         return CompletableFuture.completedValue(SomeResult.RESULT_3);
}

注意:为了得到更完整的答案,我已经更改了问题的算法

所有长操作都可能被包裹在另一个 CompletableFuture.supplyAsync 中,不费吹灰之力

如果您只需检查空值,您可以使用 Optional 解决。例如你应该这样做:

public Bar execute(String id) {

      return this.getFooById(id)
            .thenCompose(this::checkFooPresent)
            .thenCompose(this::doSomethingElse)
            .thenCompose(this::doSomethingElseMore)
            .thenApply(rankRes -> new Bar(foo));

}


private Optional<Foo> getFooById(String id) {

    // some better logic to retrieve foo

    return Optional.ofNullable(foo);
}


private CompletableFuture<Foo> checkFooPresent(Optional<Foo> optRanking) {

    CompletableFuture<Foo> future = new CompletableFuture();
    optRanking.map(future::complete).orElseGet(() -> future.completeExceptionally(new Exception("Foo not present")));
    return future;
}

checkFooPresent() 收到一个 Optional,如果它的值为 null 它异常完成 CompletableFuture

显然您需要管理该异常,但如果您之前设置了 ExceptionHandler 或类似的东西,它应该是免费的。