如何在 CompletionStage.exceptionally 中链接非阻塞操作
How to chain non-blocking action in CompletionStage.exceptionally
我正在 Java 中编写一个 Play2 应用程序服务方法,它应该执行以下操作。异步调用A方法,失败再异步调用B方法。
为了说明假设此接口用于服务调用的后端:
public interface MyBackend {
CompletionStage<Object> tryWrite(Object foo);
CompletionStage<Object> tryCleanup(Object foo);
}
所以在我的服务方法中,我想return一个可以用这些完成的未来:
- tryWrite成功完成
- tryWrite 失败和 tryCleanup 成功完成并失败,但 tryWrite() 除外
(注意:tryWrite()当然可以自己做任何清理,这是一个简单的例子来说明问题)
像这样调用后端的服务的实现对我来说似乎很难,因为 CompletionStage.exceptionally() 方法不允许组合。
版本 1:
public class MyServiceImpl {
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
CompletionStage<Object> writeFuture = myBackend.tryWrite(foo)
.exceptionally((throwable) -> {
CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo);
throw new RuntimeException(throwable);
});
return writeFuture;
}
}
因此版本 1 以非阻塞方式调用 tryCleanup(foo),但是由 tryWriteWithCleanup() 编辑的 CompletionStage return 不会等待 cleanupFuture 完成。如何将此代码更改为服务的 return 未来,该服务也会等待 cleanupFuture 完成?
版本 2:
public class MyServiceImpl {
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
final AtomicReference<Throwable> saveException = new AtomicReference<>();
CompletionStage<Object> writeFuture = myBackend
.tryWrite(foo)
.exceptionally(t -> {
saveException.set(t);
// continue with cleanup
return null;
})
.thenCompose((nil) -> {
// if no cleanup necessary, return
if (saveException.get() == null) {
return CompletableFuture.completedFuture(null);
}
return CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo)
.exceptionally(cleanupError -> {
// log error
return null;
})
.thenRun(() -> {
throw saveException.get();
});
});
return writeFuture;
}
}
版本 2 使用外部 AtomicReference 来存储失败,如果失败,则在另一个 thenCompose() 块中进行异步第二次调用。
我所有的其他尝试都以失败告终,我不想将它们粘贴在这里。
不幸的是 CompletionStage
/CompletableFuture
不提供异常处理 API 的组合。
您可以通过 handle()
和 BiFunction
以及 returns 和 CompletionStage
来解决这个问题。这将为您提供嵌套阶段 (CompletionStage<CompletionStage<Object>>
),您可以使用 compose(identity())
:
"unnest"
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
return myBackend.tryWrite(foo)
.handle((r, e) -> {
if (e != null) {
return myBackend.tryCleanup(foo)
.handle((r2, e2) -> {
// Make sure we always return the original exception
// but keep track of new exception if any,
// as if run in a finally block
if (e2 != null) {
e.addSuppressed(e2);
}
// wrapping in CompletionException behaves as if
// we threw the original exception
throw new CompletionException(e);
});
}
return CompletableFuture.completedFuture(r);
})
.thenCompose(Function.identity());
}
您可以在处理程序中等待完成:
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
return myBackend.tryWrite(foo).exceptionally(throwable -> {
myBackend.tryCleanup(foo).toCompletableFuture().join();
throw new CompletionException(throwable);
});
}
这会将结果 CompletionStage
的完成推迟到清理阶段的完成。使用 CompletionException
作为包装器将使包装对调用者透明。
但是,它也有一些缺点。虽然框架可能会在等待或生成补偿线程时使用线程,但如果它是工作线程,如果 tryWrite
返回的阶段恰好在进入 [=15 时已经完成,则阻塞的线程可能是调用者线程=].不幸的是,没有 exceptionallyAsync
方法。您可以改用 handleAsync
,但它会使代码复杂化,同时仍然感觉像一个杂乱无章的人。
此外,清理抛出的异常可能会掩盖原始失败。
更简洁的解决方案可能涉及更多:
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
CompletableFuture<Object> writeFuture = new CompletableFuture<>();
myBackend.tryWrite(foo).whenComplete((obj,throwable) -> {
if(throwable==null)
writeFuture.complete(obj);
else
myBackend.tryCleanup(foo).whenComplete((x,next) -> {
try {
if(next!=null) throwable.addSuppressed(next);
}
finally {
writeFuture.completeExceptionally(throwable);
}
});
});
return writeFuture;
}
这只是手动创建了一个 CompletableFuture
,允许控制它的完成,这将通过成功案例中链接到 tryWrite
阶段的动作直接发生,或者通过链接的动作发生在特殊情况下进入清理阶段。请注意,后者负责通过 addSuppressed
.
链接可能的后续清理异常
我正在 Java 中编写一个 Play2 应用程序服务方法,它应该执行以下操作。异步调用A方法,失败再异步调用B方法。
为了说明假设此接口用于服务调用的后端:
public interface MyBackend {
CompletionStage<Object> tryWrite(Object foo);
CompletionStage<Object> tryCleanup(Object foo);
}
所以在我的服务方法中,我想return一个可以用这些完成的未来:
- tryWrite成功完成
- tryWrite 失败和 tryCleanup 成功完成并失败,但 tryWrite() 除外
(注意:tryWrite()当然可以自己做任何清理,这是一个简单的例子来说明问题)
像这样调用后端的服务的实现对我来说似乎很难,因为 CompletionStage.exceptionally() 方法不允许组合。
版本 1:
public class MyServiceImpl {
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
CompletionStage<Object> writeFuture = myBackend.tryWrite(foo)
.exceptionally((throwable) -> {
CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo);
throw new RuntimeException(throwable);
});
return writeFuture;
}
}
因此版本 1 以非阻塞方式调用 tryCleanup(foo),但是由 tryWriteWithCleanup() 编辑的 CompletionStage return 不会等待 cleanupFuture 完成。如何将此代码更改为服务的 return 未来,该服务也会等待 cleanupFuture 完成?
版本 2:
public class MyServiceImpl {
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
final AtomicReference<Throwable> saveException = new AtomicReference<>();
CompletionStage<Object> writeFuture = myBackend
.tryWrite(foo)
.exceptionally(t -> {
saveException.set(t);
// continue with cleanup
return null;
})
.thenCompose((nil) -> {
// if no cleanup necessary, return
if (saveException.get() == null) {
return CompletableFuture.completedFuture(null);
}
return CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo)
.exceptionally(cleanupError -> {
// log error
return null;
})
.thenRun(() -> {
throw saveException.get();
});
});
return writeFuture;
}
}
版本 2 使用外部 AtomicReference 来存储失败,如果失败,则在另一个 thenCompose() 块中进行异步第二次调用。
我所有的其他尝试都以失败告终,我不想将它们粘贴在这里。
不幸的是 CompletionStage
/CompletableFuture
不提供异常处理 API 的组合。
您可以通过 handle()
和 BiFunction
以及 returns 和 CompletionStage
来解决这个问题。这将为您提供嵌套阶段 (CompletionStage<CompletionStage<Object>>
),您可以使用 compose(identity())
:
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
return myBackend.tryWrite(foo)
.handle((r, e) -> {
if (e != null) {
return myBackend.tryCleanup(foo)
.handle((r2, e2) -> {
// Make sure we always return the original exception
// but keep track of new exception if any,
// as if run in a finally block
if (e2 != null) {
e.addSuppressed(e2);
}
// wrapping in CompletionException behaves as if
// we threw the original exception
throw new CompletionException(e);
});
}
return CompletableFuture.completedFuture(r);
})
.thenCompose(Function.identity());
}
您可以在处理程序中等待完成:
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
return myBackend.tryWrite(foo).exceptionally(throwable -> {
myBackend.tryCleanup(foo).toCompletableFuture().join();
throw new CompletionException(throwable);
});
}
这会将结果 CompletionStage
的完成推迟到清理阶段的完成。使用 CompletionException
作为包装器将使包装对调用者透明。
但是,它也有一些缺点。虽然框架可能会在等待或生成补偿线程时使用线程,但如果它是工作线程,如果 tryWrite
返回的阶段恰好在进入 [=15 时已经完成,则阻塞的线程可能是调用者线程=].不幸的是,没有 exceptionallyAsync
方法。您可以改用 handleAsync
,但它会使代码复杂化,同时仍然感觉像一个杂乱无章的人。
此外,清理抛出的异常可能会掩盖原始失败。
更简洁的解决方案可能涉及更多:
public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
CompletableFuture<Object> writeFuture = new CompletableFuture<>();
myBackend.tryWrite(foo).whenComplete((obj,throwable) -> {
if(throwable==null)
writeFuture.complete(obj);
else
myBackend.tryCleanup(foo).whenComplete((x,next) -> {
try {
if(next!=null) throwable.addSuppressed(next);
}
finally {
writeFuture.completeExceptionally(throwable);
}
});
});
return writeFuture;
}
这只是手动创建了一个 CompletableFuture
,允许控制它的完成,这将通过成功案例中链接到 tryWrite
阶段的动作直接发生,或者通过链接的动作发生在特殊情况下进入清理阶段。请注意,后者负责通过 addSuppressed
.