如何在 CompletionStage.exceptionally 中链接非阻塞操作

How to chain non-blocking action in CompletionStage.exceptionally

我正在 Java 中编写一个 Play2 应用程序服务方法,它应该执行以下操作。异步调用A方法,失败再异步调用B方法。

为了说明假设此接口用于服务调用的后端:

public interface MyBackend {
    CompletionStage<Object> tryWrite(Object foo);
    CompletionStage<Object> tryCleanup(Object foo);
}

所以在我的服务方法中,我想return一个可以用这些完成的未来:

(注意:tryWrite()当然可以自己做任何清理,这是一个简单的例子来说明问题)

像这样调用后端的服务的实现对我来说似乎很难,因为 CompletionStage.exceptionally() 方法不允许组合。

版本 1:

public class MyServiceImpl {
    public CompletionStage<Object> tryWriteWithCleanup(Object foo) {

        CompletionStage<Object> writeFuture = myBackend.tryWrite(foo)
            .exceptionally((throwable) -> {
                CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo);
                throw new RuntimeException(throwable);
        });
        return writeFuture;
    }
}

因此版本 1 以非阻塞方式调用 tryCleanup(foo),但是由 tryWriteWithCleanup() 编辑的 CompletionStage return 不会等待 cleanupFuture 完成。如何将此代码更改为服务的 return 未来,该服务也会等待 cleanupFuture 完成?

版本 2:

public class MyServiceImpl {
    public CompletionStage<Object> tryWriteWithCleanup(Object foo) {

        final AtomicReference<Throwable> saveException = new AtomicReference<>();
        CompletionStage<Object> writeFuture = myBackend
            .tryWrite(foo)
            .exceptionally(t -> {
                saveException.set(t);
                // continue with cleanup
                return null;
            })
            .thenCompose((nil) -> {
                // if no cleanup necessary, return
                if (saveException.get() == null) {
                    return CompletableFuture.completedFuture(null);
                }
                return CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo)
                    .exceptionally(cleanupError -> {
                        // log error
                        return null;
                    })
                    .thenRun(() -> {
                        throw saveException.get();
                    });
        });
        return writeFuture;
    }
}

版本 2 使用外部 AtomicReference 来存储失败,如果失败,则在另一个 thenCompose() 块中进行异步第二次调用。

我所有的其他尝试都以失败告终,我不想将它们粘贴在这里。

不幸的是 CompletionStage/CompletableFuture 不提供异常处理 API 的组合。

您可以通过 handle()BiFunction 以及 returns 和 CompletionStage 来解决这个问题。这将为您提供嵌套阶段 (CompletionStage<CompletionStage<Object>>),您可以使用 compose(identity()):

"unnest"
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
    return myBackend.tryWrite(foo)
            .handle((r, e) -> {
                if (e != null) {
                    return myBackend.tryCleanup(foo)
                            .handle((r2, e2) -> {
                                // Make sure we always return the original exception
                                // but keep track of new exception if any,
                                // as if run in a finally block
                                if (e2 != null) {
                                    e.addSuppressed(e2);
                                }
                                // wrapping in CompletionException  behaves as if
                                // we threw the original exception
                                throw new CompletionException(e);
                            });
                }
                return CompletableFuture.completedFuture(r);
            })
            .thenCompose(Function.identity());
}

您可以在处理程序中等待完成:

public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
    return myBackend.tryWrite(foo).exceptionally(throwable -> {
        myBackend.tryCleanup(foo).toCompletableFuture().join();
        throw new CompletionException(throwable);
    });
}

这会将结果 CompletionStage 的完成推迟到清理阶段的完成。使用 CompletionException 作为包装器将使包装对调用者透明。

但是,它也有一些缺点。虽然框架可能会在等待或生成补偿线程时使用线程,但如果它是工作线程,如果 tryWrite 返回的阶段恰好在进入 [=15 时已经完成,则阻塞的线程可能是调用者线程=].不幸的是,没有 exceptionallyAsync 方法。您可以改用 handleAsync,但它会使代码复杂化,同时仍然感觉像一个杂乱无章的人。

此外,清理抛出的异常可能会掩盖原始失败。

更简洁的解决方案可能涉及更多:

public CompletionStage<Object> tryWriteWithCleanup(Object foo) {

    CompletableFuture<Object> writeFuture = new CompletableFuture<>();

    myBackend.tryWrite(foo).whenComplete((obj,throwable) -> {
        if(throwable==null)
            writeFuture.complete(obj);
        else
            myBackend.tryCleanup(foo).whenComplete((x,next) -> {
                try {
                    if(next!=null) throwable.addSuppressed(next);
                }
                finally {
                    writeFuture.completeExceptionally(throwable);
                }
        });
    });
    return writeFuture;
}

这只是手动创建了一个 CompletableFuture,允许控制它的完成,这将通过成功案例中链接到 tryWrite 阶段的动作直接发生,或者通过链接的动作发生在特殊情况下进入清理阶段。请注意,后者负责通过 addSuppressed.

链接可能的后续清理异常