使用默认值中断 CompletableFuture
Interrupt CompletableFuture with default value
假设我有 3 个服务。
首先我调用 serviceA
,它 return 是 CompletableFuture
。
之后我用结果调用 serviceB
和 serviceC
paralelly (thenCompose()
)。
在我获得所有结果后,我想将所有 3 个结果组合起来,并将其 return 发送给某个来电者。
在调用者中,我想等待整个过程的总 X 毫秒,以便:
- 如果我在
serviceA
调用正在进行时中断进程:抛出一些异常(所以这是强制性的)
- 如果我在
serviceB
和 serviceC
调用正在进行时中断进程:return 一些默认值(它们是可选的)。
这就是我尝试使用 CompletableFuture
的 getNow(fallback)
方法的原因
请检查下面我的代码片段,如果我在 serviceB
和 serviceC
调用中使用长延迟,我总是以 TimeoutException
结束。
我该怎么做?
public CompletableFuture<Result> getFuture() {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
return CompletableFuture.allOf(resultB, resultC)
.thenApply(ignoredVoid -> combine(
resultA.join(),
resultB.getNow(fallbackB),
resultC.getNow(fallbackC));
}
public Result extractFuture(CompletableFuture<Result> future) {
Result result;
try {
result = future.get(timeOut, MILLISECONDS);
} catch (ExecutionException ex) {
...
} catch (InterruptedException | TimeoutException ex) {
// I always ends up here...
}
return result;
}
由.allOf(resultB, resultC)
编辑的未来return仅在resultB
和resultC
都完成时才完成,因此,依赖函数ignoredVoid -> combine(resultA.join(), resultB.getNow(fallbackB), resultC.getNow(fallbackC)
将仅在 resultB
和 resultC
完成时进行评估,并且提供回退根本没有效果。
通常不可能对这些函数中的 get()
调用做出反应。考虑到未来在不同时间可以有任意数量的 get()
调用和不同的超时,这应该是显而易见的,但是传递给 thenApply
的函数只被评估一次。
在 getFuture()
内处理消费者指定超时的唯一方法是将其更改为 return 接收超时的函数:
interface FutureFunc<R> {
R get(long time, TimeUnit u) throws ExecutionException;
}
public FutureFunc<Result> getFuture() {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
CompletableFuture<Result> optimistic = CompletableFuture.allOf(resultB, resultC)
.thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(), resultC.join()));
return (t,u) -> {
try {
return optimistic.get(t, u);
} catch (InterruptedException | TimeoutException ex) {
return combine(resultA.join(), resultB.getNow(fallbackB),
resultC.getNow(fallbackC));
}
};
}
public Result extractFuture(FutureFunc<Result> future) {
Result result;
try {
result = future.get(timeOut, MILLISECONDS);
} catch (ExecutionException ex) {
...
}
return result;
}
现在,可以进行具有不同超时的不同调用,只要 B 或 C 尚未完成,结果就可能不同。并不是说 combine
方法存在一些歧义,这可能也需要一些时间。
您可以将函数更改为
return (t,u) -> {
try {
if(resultB.isDone() && resultC.isDone()) return optimistic.get();
return optimistic.get(t, u);
} catch (InterruptedException | TimeoutException ex) {
return combine(resultA.join(), resultB.getNow(fallbackB),
resultC.getNow(fallbackC));
}
};
等待完成可能已经运行combine
。在任何一种情况下,都不能保证在指定时间内交付结果,因为即使使用 B 和 C 的回退值,也会执行 combine
,这可能需要任意时间.
如果您想要类似取消的行为,即所有结果查询 return 相同的结果,即使它是使用第一个查询的回退值计算的,您也可以改用
public FutureFunc<Result> getFuture() {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC);
CompletableFuture<Result> result = bAndC
.thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(),
resultC.join()));
return (t,u) -> {
try {
bAndC.get(t, u);
} catch (InterruptedException|TimeoutException ex) {
resultB.complete(fallbackB);
resultC.complete(fallbackC);
}
try {
return result.get();
} catch (InterruptedException ex) {
throw new ExecutionException(ex);
}
};
}
有了这个,单个 FutureFunc
上的所有查询将一致地 return 相同的结果,即使它是基于第一次超时的回退值。此变体还始终将 combine
的执行从超时中排除。
当然,如果根本不需要不同的超时时间,您可以重构 getFuture()
以提前获得所需的超时时间,例如作为参数。这将大大简化实施,并且可以 return 再次成为未来:
public CompletableFuture<Result> getFuture(long timeOut, TimeUnit u) {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.schedule(() -> resultB.complete(fallbackB), timeOut, u);
e.schedule(() -> resultC.complete(fallbackC), timeOut, u);
CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC);
bAndC.thenRun(e::shutdown);
return bAndC.thenApply(ignoredVoid ->
combine(resultA.join(), resultB.join(), resultC.join()));
}
假设我有 3 个服务。
首先我调用 serviceA
,它 return 是 CompletableFuture
。
之后我用结果调用 serviceB
和 serviceC
paralelly (thenCompose()
)。
在我获得所有结果后,我想将所有 3 个结果组合起来,并将其 return 发送给某个来电者。
在调用者中,我想等待整个过程的总 X 毫秒,以便:
- 如果我在
serviceA
调用正在进行时中断进程:抛出一些异常(所以这是强制性的) - 如果我在
serviceB
和serviceC
调用正在进行时中断进程:return 一些默认值(它们是可选的)。 这就是我尝试使用CompletableFuture
的
getNow(fallback)
方法的原因
请检查下面我的代码片段,如果我在 serviceB
和 serviceC
调用中使用长延迟,我总是以 TimeoutException
结束。
我该怎么做?
public CompletableFuture<Result> getFuture() {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
return CompletableFuture.allOf(resultB, resultC)
.thenApply(ignoredVoid -> combine(
resultA.join(),
resultB.getNow(fallbackB),
resultC.getNow(fallbackC));
}
public Result extractFuture(CompletableFuture<Result> future) {
Result result;
try {
result = future.get(timeOut, MILLISECONDS);
} catch (ExecutionException ex) {
...
} catch (InterruptedException | TimeoutException ex) {
// I always ends up here...
}
return result;
}
由.allOf(resultB, resultC)
编辑的未来return仅在resultB
和resultC
都完成时才完成,因此,依赖函数ignoredVoid -> combine(resultA.join(), resultB.getNow(fallbackB), resultC.getNow(fallbackC)
将仅在 resultB
和 resultC
完成时进行评估,并且提供回退根本没有效果。
通常不可能对这些函数中的 get()
调用做出反应。考虑到未来在不同时间可以有任意数量的 get()
调用和不同的超时,这应该是显而易见的,但是传递给 thenApply
的函数只被评估一次。
在 getFuture()
内处理消费者指定超时的唯一方法是将其更改为 return 接收超时的函数:
interface FutureFunc<R> {
R get(long time, TimeUnit u) throws ExecutionException;
}
public FutureFunc<Result> getFuture() {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
CompletableFuture<Result> optimistic = CompletableFuture.allOf(resultB, resultC)
.thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(), resultC.join()));
return (t,u) -> {
try {
return optimistic.get(t, u);
} catch (InterruptedException | TimeoutException ex) {
return combine(resultA.join(), resultB.getNow(fallbackB),
resultC.getNow(fallbackC));
}
};
}
public Result extractFuture(FutureFunc<Result> future) {
Result result;
try {
result = future.get(timeOut, MILLISECONDS);
} catch (ExecutionException ex) {
...
}
return result;
}
现在,可以进行具有不同超时的不同调用,只要 B 或 C 尚未完成,结果就可能不同。并不是说 combine
方法存在一些歧义,这可能也需要一些时间。
您可以将函数更改为
return (t,u) -> {
try {
if(resultB.isDone() && resultC.isDone()) return optimistic.get();
return optimistic.get(t, u);
} catch (InterruptedException | TimeoutException ex) {
return combine(resultA.join(), resultB.getNow(fallbackB),
resultC.getNow(fallbackC));
}
};
等待完成可能已经运行combine
。在任何一种情况下,都不能保证在指定时间内交付结果,因为即使使用 B 和 C 的回退值,也会执行 combine
,这可能需要任意时间.
如果您想要类似取消的行为,即所有结果查询 return 相同的结果,即使它是使用第一个查询的回退值计算的,您也可以改用
public FutureFunc<Result> getFuture() {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC);
CompletableFuture<Result> result = bAndC
.thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(),
resultC.join()));
return (t,u) -> {
try {
bAndC.get(t, u);
} catch (InterruptedException|TimeoutException ex) {
resultB.complete(fallbackB);
resultC.complete(fallbackC);
}
try {
return result.get();
} catch (InterruptedException ex) {
throw new ExecutionException(ex);
}
};
}
有了这个,单个 FutureFunc
上的所有查询将一致地 return 相同的结果,即使它是基于第一次超时的回退值。此变体还始终将 combine
的执行从超时中排除。
当然,如果根本不需要不同的超时时间,您可以重构 getFuture()
以提前获得所需的超时时间,例如作为参数。这将大大简化实施,并且可以 return 再次成为未来:
public CompletableFuture<Result> getFuture(long timeOut, TimeUnit u) {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.schedule(() -> resultB.complete(fallbackB), timeOut, u);
e.schedule(() -> resultC.complete(fallbackC), timeOut, u);
CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC);
bAndC.thenRun(e::shutdown);
return bAndC.thenApply(ignoredVoid ->
combine(resultA.join(), resultB.join(), resultC.join()));
}