具有回调和上下文切换的scala Future
scala Future with callback and context switching
我试图避免在我的 Future 地图回调中进行上下文切换。我看到 akka 有 SameThreadExecutionContext 应该处理这种类型的回调,但我不确定我是否完全理解它:
val ec1 = ExecutionContext.fromExecutorService(...)
val ec2 = ExecutionContext.fromExecutorService(...)
println("0 " + Thread.currentThread().getName)
def futureOnEc1 = Future {
println(s"1 " + Thread.currentThread().getName)
}(ec1)
futureOnEc1.map { a =>
println(s"2 " + Thread.currentThread().getName)
a + 1
}(AkkaSameThreadExecutionContext)
我以为我会得到:
0 pool-2-thread-1
1 pool-1-thread-1
2 pool-1-thread-1
但实际结果是
0 pool-2-thread-1
1 pool-1-thread-1
2 pool-2-thread-1
我想念什么?重点是 运行 未来同一线程上的回调,而不是调用原始未来的线程。
当 future 尚未完成时,在同一个线程池中调用回调 ec1
。通过将 Thread.sleep(1000)
添加到您的 Future 主体中来测试它。
这段代码确实如您所愿
println("0 " + Thread.currentThread().getName)
val futureOnEc1 = Future {
Thread.sleep(1000)
println(s"1 " + Thread.currentThread().getName)
0
}(ec1)
futureOnEc1.map { a =>
println(s"2 " + Thread.currentThread().getName)
a + 1
}(sameThreadExecutionContext)
打印
0 main
1 pool-1-thread-1
2 pool-1-thread-1
但是如果 future 已经完成,回调会立即由注册它的线程执行。
删除 Thread.sleep
并在
之后打印相同的代码
0 main
1 pool-1-thread-1
2 main
编辑:
来自 scala.concurrent.Future#onComplete
的文档指出了这种行为。
When this future is completed, either through an exception, or a value, apply the provided function. If the future has already been completed, this will either be applied immediately or be scheduled asynchronously.
从 scala.concurrent.impl.Promise.DefaultPromise#dispatchOrAddCallback
Tries to add the callback, if already completed, it dispatches the callback to be executed.
在使用 Scala Future
s 时避免上下文切换的巧妙技巧在于使用 parasitic
作为 ExecutionContext
,"steals execution time from other threads by having its Runnables run on the Thread which calls execute and then yielding back control to the caller after all its Runnables have been executed"。 parasitic
自 Scala 2.13 起可用,但您可以通过查看 its code (here for version 2.13.1) 轻松理解它并将其移植到 2.13 之前的项目。对于 2.13 之前的项目,一个天真但有效的实现将简单地 运行 Runnable
而不关心在线程上分派它们,这就达到了目的,如以下代码片段所示:
object parasitic212 extends ExecutionContext {
override def execute(runnable: Runnable): Unit =
runnable.run()
// reporting failures is left as an exercise for the reader
override def reportFailure(cause: Throwable): Unit = ???
}
parasitic
的实现当然更加细致。为了更深入地了解其使用的推理和一些注意事项,我建议您参考 the PR the introduced parasitic
作为公开可用的 API(它已经实现但保留供内部使用)。
引用原始 PR 描述:
A synchronous, trampolining, ExecutionContext has been used for a long time within the Future implementation to run controlled logic as cheaply as possible.
I believe that there is a significant number of use-cases where it makes sense, for efficiency, to execute logic synchronously in a safe(-ish) way without having users to implement the logic for that ExecutionContext themselves—it is tricky to implement to say the least.
It is important to remember that ExecutionContext should be supplied via an implicit parameter, so that the caller can decide where logic should be executed. The use of ExecutionContext.parasitic means that logic may end up running on Threads/Pools that were not designed or intended to run specified logic. For instance, you may end up running CPU-bound logic on an IO-designed pool or vice versa. So use of parasitic is only advisable when it really makes sense. There is also a real risk of hitting WhosebugErrors for certain patterns of nested invocations where a deep call chain ends up in the parasitic executor, leading to even more stack usage in the subsequent execution. Currently the parasitic ExecutionContext will allow a nested sequence of invocations at max 16, this may be changed in the future if it is discovered to cause problems.
如 the official documentation for parasitic
中所建议,建议您仅在执行的代码快速 returns 控制调用方时才使用它。以下是版本 2.13.1 引用的文档:
WARNING: Only ever execute logic which will quickly return control to the caller.
This ExecutionContext steals execution time from other threads by having its Runnables run on the Thread which calls execute and then yielding back control to the caller after all its Runnables have been executed. Nested invocations of execute will be trampolined to prevent uncontrolled stack space growth.
When using parasitic with abstractions such as Future it will in many cases be non-deterministic as to which Thread will be executing the logic, as it depends on when/if that Future is completed.
Do not call any blocking code in the Runnables submitted to this ExecutionContext as it will prevent progress by other enqueued Runnables and the calling Thread.
Symptoms of misuse of this ExecutionContext include, but are not limited to, deadlocks and severe performance problems.
Any NonFatal or InterruptedExceptions will be reported to the defaultReporter.
我试图避免在我的 Future 地图回调中进行上下文切换。我看到 akka 有 SameThreadExecutionContext 应该处理这种类型的回调,但我不确定我是否完全理解它:
val ec1 = ExecutionContext.fromExecutorService(...)
val ec2 = ExecutionContext.fromExecutorService(...)
println("0 " + Thread.currentThread().getName)
def futureOnEc1 = Future {
println(s"1 " + Thread.currentThread().getName)
}(ec1)
futureOnEc1.map { a =>
println(s"2 " + Thread.currentThread().getName)
a + 1
}(AkkaSameThreadExecutionContext)
我以为我会得到:
0 pool-2-thread-1
1 pool-1-thread-1
2 pool-1-thread-1
但实际结果是
0 pool-2-thread-1
1 pool-1-thread-1
2 pool-2-thread-1
我想念什么?重点是 运行 未来同一线程上的回调,而不是调用原始未来的线程。
当 future 尚未完成时,在同一个线程池中调用回调 ec1
。通过将 Thread.sleep(1000)
添加到您的 Future 主体中来测试它。
这段代码确实如您所愿
println("0 " + Thread.currentThread().getName)
val futureOnEc1 = Future {
Thread.sleep(1000)
println(s"1 " + Thread.currentThread().getName)
0
}(ec1)
futureOnEc1.map { a =>
println(s"2 " + Thread.currentThread().getName)
a + 1
}(sameThreadExecutionContext)
打印
0 main
1 pool-1-thread-1
2 pool-1-thread-1
但是如果 future 已经完成,回调会立即由注册它的线程执行。
删除 Thread.sleep
并在
0 main
1 pool-1-thread-1
2 main
编辑:
来自 scala.concurrent.Future#onComplete
的文档指出了这种行为。
When this future is completed, either through an exception, or a value, apply the provided function. If the future has already been completed, this will either be applied immediately or be scheduled asynchronously.
从 scala.concurrent.impl.Promise.DefaultPromise#dispatchOrAddCallback
Tries to add the callback, if already completed, it dispatches the callback to be executed.
在使用 Scala Future
s 时避免上下文切换的巧妙技巧在于使用 parasitic
作为 ExecutionContext
,"steals execution time from other threads by having its Runnables run on the Thread which calls execute and then yielding back control to the caller after all its Runnables have been executed"。 parasitic
自 Scala 2.13 起可用,但您可以通过查看 its code (here for version 2.13.1) 轻松理解它并将其移植到 2.13 之前的项目。对于 2.13 之前的项目,一个天真但有效的实现将简单地 运行 Runnable
而不关心在线程上分派它们,这就达到了目的,如以下代码片段所示:
object parasitic212 extends ExecutionContext {
override def execute(runnable: Runnable): Unit =
runnable.run()
// reporting failures is left as an exercise for the reader
override def reportFailure(cause: Throwable): Unit = ???
}
parasitic
的实现当然更加细致。为了更深入地了解其使用的推理和一些注意事项,我建议您参考 the PR the introduced parasitic
作为公开可用的 API(它已经实现但保留供内部使用)。
引用原始 PR 描述:
A synchronous, trampolining, ExecutionContext has been used for a long time within the Future implementation to run controlled logic as cheaply as possible.
I believe that there is a significant number of use-cases where it makes sense, for efficiency, to execute logic synchronously in a safe(-ish) way without having users to implement the logic for that ExecutionContext themselves—it is tricky to implement to say the least.
It is important to remember that ExecutionContext should be supplied via an implicit parameter, so that the caller can decide where logic should be executed. The use of ExecutionContext.parasitic means that logic may end up running on Threads/Pools that were not designed or intended to run specified logic. For instance, you may end up running CPU-bound logic on an IO-designed pool or vice versa. So use of parasitic is only advisable when it really makes sense. There is also a real risk of hitting WhosebugErrors for certain patterns of nested invocations where a deep call chain ends up in the parasitic executor, leading to even more stack usage in the subsequent execution. Currently the parasitic ExecutionContext will allow a nested sequence of invocations at max 16, this may be changed in the future if it is discovered to cause problems.
如 the official documentation for parasitic
中所建议,建议您仅在执行的代码快速 returns 控制调用方时才使用它。以下是版本 2.13.1 引用的文档:
WARNING: Only ever execute logic which will quickly return control to the caller.
This ExecutionContext steals execution time from other threads by having its Runnables run on the Thread which calls execute and then yielding back control to the caller after all its Runnables have been executed. Nested invocations of execute will be trampolined to prevent uncontrolled stack space growth.
When using parasitic with abstractions such as Future it will in many cases be non-deterministic as to which Thread will be executing the logic, as it depends on when/if that Future is completed.
Do not call any blocking code in the Runnables submitted to this ExecutionContext as it will prevent progress by other enqueued Runnables and the calling Thread.
Symptoms of misuse of this ExecutionContext include, but are not limited to, deadlocks and severe performance problems.
Any NonFatal or InterruptedExceptions will be reported to the defaultReporter.