CompletableFuture 异常处理 runAsync & thenRun
CompletableFuture exception handling runAsync & thenRun
假设我有这个示例代码,但在 runAsync
中遇到异常。我的问题是,此异常是否会阻止 thenRun
执行,因为 thenRun
与此代码的调用方方法在同一线程中运行。
private void caller() {
CompletableFuture.runAsync(() -> {
try {
// some code
} catch (Exception e) {
throw new CustomException(errorMessage, e);
}
}, anInstanceOfTaskExecutor).thenRun(
// thenRun code
));
}
我已经完成了 线程,它解释了如何处理异步块抛出的异常(即通过阻塞和使用 join
)。我想知道 thenRun
块中的代码是否会在 CompletableFuture
completesExceptionally
.
时执行
更新:
我运行一些代码来测试这个:
CompletableFuture.runAsync(() -> {
List<Integer> integerList = new ArrayList<>();
integerList.get(1); // throws exception
}).thenRun(() -> {
System.out.println("No exception occurred");
});
它不打印任何内容,这意味着异常没有 'propagate up to/reach' 来自异步块的调用方方法的线程。我现在了解此处的预期行为,但我有以下问题:
- 为什么即使 CompletableFuture 异常完成,它也会默默地失败?
- 它在后台如何工作?
- 是否因为这两个线程(调用者线程和异步线程)都有自己的堆栈space?
这将取决于您要添加的步骤exceptionally
。
下面的情况会跳过thenRun
直接执行异常块
CompletableFuture.runAsync(() -> {
//process and throw exception
}, anInstanceOfTaskExecutor )
.thenRun(() -> {})
.exceptionally(exception -> {
// do something, handle exception
})
));
在这种情况下,它将执行thenRun
。
CompletableFuture.runAsync(() -> {
//process and throw exception
}, anInstanceOfTaskExecutor )
.exceptionally(exception -> {
// do something, handle exception
})
.thenRun(() -> {})
));
希望对你有所帮助
一般信息
CompletionStage
的文档解释了接口的一般规则:
A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage
completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages. The functionality defined in this interface takes only a few basic forms, which expand out to a larger set of methods to capture a range of usage styles:
The computation performed by a stage may be expressed as a Function
, Consumer
, or Runnable
(using methods with names including apply, accept, or run, respectively) depending on whether it requires arguments and/or produces results. For example:
stage.thenApply(x -> square(x))
.thenAccept(x -> System.out.print(x))
.thenRun(() -> System.out.println());
An additional form (compose) allows the construction of computation pipelines from functions returning completion stages.
Any argument to a stage's computation is the outcome of a triggering stage's computation.
One stage's execution may be triggered by completion of a single stage, or both of two stages, or either of two stages. Dependencies on a single stage are arranged using methods with prefix then. Those triggered by completion of both of two stages may combine their results or effects, using correspondingly named methods. Those triggered by either of two stages make no guarantees about which of the results or effects are used for the dependent stage's computation.
Dependencies among stages control the triggering of computations, but do not otherwise guarantee any particular ordering. Additionally, execution of a new stage's computations may be arranged in any of three ways: default execution, default asynchronous execution (using methods with suffix async that employ the stage's default asynchronous execution facility), or custom (via a supplied Executor
). The execution properties of default and async modes are specified by CompletionStage
implementations, not this interface. Methods with explicit Executor
arguments may have arbitrary execution properties, and might not even support concurrent execution, but are arranged for processing in a way that accommodates asynchrony.
Two method forms (handle
and whenComplete
) support unconditional computation whether the triggering stage completed normally or exceptionally. Method exceptionally
supports computation only when the triggering stage completes exceptionally, computing a replacement result, similarly to the java [sic] catch
keyword. In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with a CompletionException
holding the exception as its cause. If a stage is dependent on both of two stages, and both complete exceptionally, then the CompletionException
may correspond to either one of these exceptions. If a stage is dependent on either of two others, and only one of them completes exceptionally, no guarantees are made about whether the dependent stage completes normally or exceptionally. In the case of method whenComplete
, when the supplied action itself encounters an exception, then the stage completes exceptionally with this exception unless the source stage also completed exceptionally, in which case the exceptional completion from the source stage is given preference and propagated to the dependent stage.
All methods adhere to the above triggering, execution, and exceptional completion specifications (which are not repeated in individual method specifications). [...]
[...]
CompletableFuture
的文档解释了线程规则(和其他策略),其中,如上所述,其中一些留给 CompletionStage
的实现:
A Future
that may be explicitly completed (setting its value and status), and may be used as a CompletionStage
, supporting dependent functions and actions that trigger upon its completion.
When two or more threads attempt to complete
, completeExceptionally
, or cancel
a CompletableFuture
, only one of them succeeds.
In addition to these and related methods for directly manipulating status and results, CompletableFuture
implements interface CompletionStage
with the following policies:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture
, or by any other caller of a completion method.
All async methods without an explicit Executor
argument are performed using the ForkJoinPool.commonPool()
(unless it does not support a parallelism level of at least two, in which case, a new Thread
is created to run each task). This may be overridden for non-static methods in subclasses by defining method defaultExecutor()
. To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask
. Operations with time-delays can use adapter methods defined in this class, for example: supplyAsync(supplier, delayedExecutor(timeout, timeUnit))
. To support methods with delays and timeouts, this class maintains at most one daemon thread for triggering and cancelling actions, not for running them.
All CompletionStage
methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.
All CompletionStage
methods return CompletableFuture
s. To restrict usages to only those methods defined in interface CompletionStage
, use method minimalCompletionStage()
. Or to ensure only that clients do not themselves modify a future, use method copy()
.
CompletableFuture
also implements Future
with the following policies:
Since (unlike FutureTask
) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel
has the same effect as completeExceptionally(new CancellationException())
. Method isCompletedExceptionally()
can be used to determine if a CompletableFuture
completed in any exceptional fashion.
In case of exceptional completion with a CompletionException
, methods get()
and get(long, TimeUnit)
throw an ExecutionException
with the same cause as held in the corresponding CompletionException
. To simplify usage in most contexts, this class also defines methods join()
and getNow(T)
that instead throw the CompletionException
directly in these cases.
[...]
您的问题
这是您的示例代码:
CompletableFuture.runAsync(() -> {
List<Integer> integerList = new ArrayList<>();
integerList.get(1); // throws exception
}).thenRun(() -> {
System.out.println("No exception occurred");
});
如果您不知道,方法如 thenRun
return a new CompletionStage
。所以你的代码类似于下面这样:
CompletableFuture<Void> runAsyncStage = CompletableFuture.runAsync(() -> List.of().get(0));
CompletableFuture<Void> thenRunStage =
runAsyncStage.thenRun(() -> System.out.println("thenRun executing!"));
thenRunStage
由 runAsyncStage
的完成触发,在这种情况下,IndexOutOfBoundsException
保证异常完成。至于为什么Runnable
没有执行,那是因为CompletionStage#thenRun(Runnable)
的合约:
Returns a new CompletionStage
that, when this stage completes normally, executes the given action. See the CompletionStage
documentation for rules covering exceptional completion.
由于触发阶段异常完成,thenRunStage
阶段也异常完成,这意味着 Runnable
被跳过。
1。 "Why is it silently failing even though the CompletableFuture completesExceptionally?"
示例代码相当于用 try-catch 块吞下异常。您看不到异常是因为您没有编写报告异常的代码。 runAsyncStage
和 thenRunStage
阶段都异常完成,后者是因为前者异常完成。
如果您想了解阶段的异常 "within the chain",则必须使用 exceptionally[Async]
、handle[Async]
和 whenComplete[Async]
等阶段。这样做允许您根据触发阶段的正常或异常完成来更改链的行为。
如果你想知道阶段的异常"outside the chain",那么你必须使用join()
、get()
和get(long,TimeUnit)
等方法。如果该阶段异常完成,那么第一个阶段将抛出 CompletionException
包装失败原因,而后两个阶段将抛出 ExecutionException
包装失败原因。
2。 "How does it work in the background?"
CompletableFuture
的实现过于复杂,无法在 Stack Overflow 答案中进行解释。如果您想研究实现,可以查看源代码。您的 JDK 应该附带一个包含 Java 源文件的 src.zip
文件。也可以在OpenJDK repositories中在线查看源代码。例如,这里是 CompletableFuture
的 JDK 13 源代码:
3。 "Is it because both these threads (caller's thread & asynchronous thread) have their own stack space?"
除非两个线程之间存在某种通信,否则一个线程不会知道另一个线程中的异常。 join()
等调用方法将在适当的时候将异常传递给将抛出所述异常的调用线程。但是,正如您对第一个问题的回答所示,它比这稍微复杂一些。即使线程在单个阶段中抛出异常,您也不会看到堆栈跟踪或类似的东西。这是因为异常 被捕获 并且该阶段被标记为失败 并且该异常是原因 。然后其他代码必须根据需要显式检索和处理该异常。
这与使用 ExecutorService
和 returned Future
对象没有什么不同。该任务可能会在后台失败,但在查询 Future
之前其他代码不会意识到这一点。
来自赏金:“我希望了解线程如何相互交互的细节。”
我不确定还要添加什么。 CompletionStage
API 是一个抽象 "above" 线程。您只需告诉 API 您希望命令链如何执行,包括每个阶段使用哪个线程池,实现就会为您处理所有线程间通信。也就是说,每个线程都做自己的事情,只是 API 旨在提供一种更简单和反应性的线程间通信方式。如果您对 如何实现 感兴趣,那么我建议您研究源代码(链接在上面)。
假设我有这个示例代码,但在 runAsync
中遇到异常。我的问题是,此异常是否会阻止 thenRun
执行,因为 thenRun
与此代码的调用方方法在同一线程中运行。
private void caller() {
CompletableFuture.runAsync(() -> {
try {
// some code
} catch (Exception e) {
throw new CustomException(errorMessage, e);
}
}, anInstanceOfTaskExecutor).thenRun(
// thenRun code
));
}
我已经完成了 join
)。我想知道 thenRun
块中的代码是否会在 CompletableFuture
completesExceptionally
.
更新:
我运行一些代码来测试这个:
CompletableFuture.runAsync(() -> {
List<Integer> integerList = new ArrayList<>();
integerList.get(1); // throws exception
}).thenRun(() -> {
System.out.println("No exception occurred");
});
它不打印任何内容,这意味着异常没有 'propagate up to/reach' 来自异步块的调用方方法的线程。我现在了解此处的预期行为,但我有以下问题:
- 为什么即使 CompletableFuture 异常完成,它也会默默地失败?
- 它在后台如何工作?
- 是否因为这两个线程(调用者线程和异步线程)都有自己的堆栈space?
这将取决于您要添加的步骤exceptionally
。
下面的情况会跳过thenRun
直接执行异常块
CompletableFuture.runAsync(() -> {
//process and throw exception
}, anInstanceOfTaskExecutor )
.thenRun(() -> {})
.exceptionally(exception -> {
// do something, handle exception
})
));
在这种情况下,它将执行thenRun
。
CompletableFuture.runAsync(() -> {
//process and throw exception
}, anInstanceOfTaskExecutor )
.exceptionally(exception -> {
// do something, handle exception
})
.thenRun(() -> {})
));
希望对你有所帮助
一般信息
CompletionStage
的文档解释了接口的一般规则:
A stage of a possibly asynchronous computation, that performs an action or computes a value when another
CompletionStage
completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages. The functionality defined in this interface takes only a few basic forms, which expand out to a larger set of methods to capture a range of usage styles:
The computation performed by a stage may be expressed as a
Function
,Consumer
, orRunnable
(using methods with names including apply, accept, or run, respectively) depending on whether it requires arguments and/or produces results. For example:stage.thenApply(x -> square(x)) .thenAccept(x -> System.out.print(x)) .thenRun(() -> System.out.println());
An additional form (compose) allows the construction of computation pipelines from functions returning completion stages.
Any argument to a stage's computation is the outcome of a triggering stage's computation.
One stage's execution may be triggered by completion of a single stage, or both of two stages, or either of two stages. Dependencies on a single stage are arranged using methods with prefix then. Those triggered by completion of both of two stages may combine their results or effects, using correspondingly named methods. Those triggered by either of two stages make no guarantees about which of the results or effects are used for the dependent stage's computation.
Dependencies among stages control the triggering of computations, but do not otherwise guarantee any particular ordering. Additionally, execution of a new stage's computations may be arranged in any of three ways: default execution, default asynchronous execution (using methods with suffix async that employ the stage's default asynchronous execution facility), or custom (via a supplied
Executor
). The execution properties of default and async modes are specified byCompletionStage
implementations, not this interface. Methods with explicitExecutor
arguments may have arbitrary execution properties, and might not even support concurrent execution, but are arranged for processing in a way that accommodates asynchrony.Two method forms (
handle
andwhenComplete
) support unconditional computation whether the triggering stage completed normally or exceptionally. Methodexceptionally
supports computation only when the triggering stage completes exceptionally, computing a replacement result, similarly to the java [sic]catch
keyword. In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with aCompletionException
holding the exception as its cause. If a stage is dependent on both of two stages, and both complete exceptionally, then theCompletionException
may correspond to either one of these exceptions. If a stage is dependent on either of two others, and only one of them completes exceptionally, no guarantees are made about whether the dependent stage completes normally or exceptionally. In the case of methodwhenComplete
, when the supplied action itself encounters an exception, then the stage completes exceptionally with this exception unless the source stage also completed exceptionally, in which case the exceptional completion from the source stage is given preference and propagated to the dependent stage.All methods adhere to the above triggering, execution, and exceptional completion specifications (which are not repeated in individual method specifications). [...]
[...]
CompletableFuture
的文档解释了线程规则(和其他策略),其中,如上所述,其中一些留给 CompletionStage
的实现:
A
Future
that may be explicitly completed (setting its value and status), and may be used as aCompletionStage
, supporting dependent functions and actions that trigger upon its completion.When two or more threads attempt to
complete
,completeExceptionally
, orcancel
aCompletableFuture
, only one of them succeeds.In addition to these and related methods for directly manipulating status and results,
CompletableFuture
implements interfaceCompletionStage
with the following policies:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current
CompletableFuture
, or by any other caller of a completion method.All async methods without an explicit
Executor
argument are performed using theForkJoinPool.commonPool()
(unless it does not support a parallelism level of at least two, in which case, a newThread
is created to run each task). This may be overridden for non-static methods in subclasses by defining methoddefaultExecutor()
. To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interfaceCompletableFuture.AsynchronousCompletionTask
. Operations with time-delays can use adapter methods defined in this class, for example:supplyAsync(supplier, delayedExecutor(timeout, timeUnit))
. To support methods with delays and timeouts, this class maintains at most one daemon thread for triggering and cancelling actions, not for running them.All
CompletionStage
methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.All
CompletionStage
methods returnCompletableFuture
s. To restrict usages to only those methods defined in interfaceCompletionStage
, use methodminimalCompletionStage()
. Or to ensure only that clients do not themselves modify a future, use methodcopy()
.
CompletableFuture
also implementsFuture
with the following policies:
Since (unlike
FutureTask
) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Methodcancel
has the same effect ascompleteExceptionally(new CancellationException())
. MethodisCompletedExceptionally()
can be used to determine if aCompletableFuture
completed in any exceptional fashion.In case of exceptional completion with a
CompletionException
, methodsget()
andget(long, TimeUnit)
throw anExecutionException
with the same cause as held in the correspondingCompletionException
. To simplify usage in most contexts, this class also defines methodsjoin()
andgetNow(T)
that instead throw theCompletionException
directly in these cases.[...]
您的问题
这是您的示例代码:
CompletableFuture.runAsync(() -> {
List<Integer> integerList = new ArrayList<>();
integerList.get(1); // throws exception
}).thenRun(() -> {
System.out.println("No exception occurred");
});
如果您不知道,方法如 thenRun
return a new CompletionStage
。所以你的代码类似于下面这样:
CompletableFuture<Void> runAsyncStage = CompletableFuture.runAsync(() -> List.of().get(0));
CompletableFuture<Void> thenRunStage =
runAsyncStage.thenRun(() -> System.out.println("thenRun executing!"));
thenRunStage
由 runAsyncStage
的完成触发,在这种情况下,IndexOutOfBoundsException
保证异常完成。至于为什么Runnable
没有执行,那是因为CompletionStage#thenRun(Runnable)
的合约:
Returns a new
CompletionStage
that, when this stage completes normally, executes the given action. See theCompletionStage
documentation for rules covering exceptional completion.
由于触发阶段异常完成,thenRunStage
阶段也异常完成,这意味着 Runnable
被跳过。
1。 "Why is it silently failing even though the CompletableFuture completesExceptionally?"
示例代码相当于用 try-catch 块吞下异常。您看不到异常是因为您没有编写报告异常的代码。 runAsyncStage
和 thenRunStage
阶段都异常完成,后者是因为前者异常完成。
如果您想了解阶段的异常 "within the chain",则必须使用 exceptionally[Async]
、handle[Async]
和 whenComplete[Async]
等阶段。这样做允许您根据触发阶段的正常或异常完成来更改链的行为。
如果你想知道阶段的异常"outside the chain",那么你必须使用join()
、get()
和get(long,TimeUnit)
等方法。如果该阶段异常完成,那么第一个阶段将抛出 CompletionException
包装失败原因,而后两个阶段将抛出 ExecutionException
包装失败原因。
2。 "How does it work in the background?"
CompletableFuture
的实现过于复杂,无法在 Stack Overflow 答案中进行解释。如果您想研究实现,可以查看源代码。您的 JDK 应该附带一个包含 Java 源文件的 src.zip
文件。也可以在OpenJDK repositories中在线查看源代码。例如,这里是 CompletableFuture
的 JDK 13 源代码:
3。 "Is it because both these threads (caller's thread & asynchronous thread) have their own stack space?"
除非两个线程之间存在某种通信,否则一个线程不会知道另一个线程中的异常。 join()
等调用方法将在适当的时候将异常传递给将抛出所述异常的调用线程。但是,正如您对第一个问题的回答所示,它比这稍微复杂一些。即使线程在单个阶段中抛出异常,您也不会看到堆栈跟踪或类似的东西。这是因为异常 被捕获 并且该阶段被标记为失败 并且该异常是原因 。然后其他代码必须根据需要显式检索和处理该异常。
这与使用 ExecutorService
和 returned Future
对象没有什么不同。该任务可能会在后台失败,但在查询 Future
之前其他代码不会意识到这一点。
来自赏金:“我希望了解线程如何相互交互的细节。”
我不确定还要添加什么。 CompletionStage
API 是一个抽象 "above" 线程。您只需告诉 API 您希望命令链如何执行,包括每个阶段使用哪个线程池,实现就会为您处理所有线程间通信。也就是说,每个线程都做自己的事情,只是 API 旨在提供一种更简单和反应性的线程间通信方式。如果您对 如何实现 感兴趣,那么我建议您研究源代码(链接在上面)。