如果关键的 Future 在 Scala 中完成,如何丢弃其他 Futures?
How to discard other Futures if the critical Future is finished in Scala?
假设我有三个远程调用来构建我的页面。其中一个 (X) 对页面至关重要,另外两个 (A, B) 仅用于增强体验。
因为criticalFutureX
太重要了,不会受到futureA
和futureB
的影响,所以我希望所有远程调用的整体延迟不超过X。
这意味着,如果 criticalFutureX
完成,我想丢弃 futureA
和 futureB
。
val criticalFutureX = ...
val futureA = ...
val futureB = ...
// the overall latency of this for-comprehension depends on the longest among X, A and B
for {
x <- criticalFutureX
a <- futureA
b <- futureB
} ...
在上面的例子中,虽然它们是并行执行的,但整体延迟取决于X、A、B中最长的,这不是我想要的。
Latencies:
X: |----------|
A: |---------------|
B: |---|
O: |---------------| (overall latency)
有firstCompletedOf但不能用来显式说"in case of completed of criticalFutureX"。
有没有类似下面的东西?
val criticalFutureX = ...
val futureA = ...
val futureB = ...
for {
x <- criticalFutureX
a <- futureA // discard when criticalFutureX finished
b <- futureB // discard when criticalFutureX finished
} ...
X: |----------|
A: |-----------... discarded
B: |---|
O: |----------| (overall latency)
你可以通过承诺实现这一目标
def completeOnMain[A, B](main: Future[A], secondary: Future[B]) = {
val promise = Promise[Option[B]]()
main.onComplete {
case Failure(_) =>
case Success(_) => promise.trySuccess(None)
}
secondary.onComplete {
case Failure(exception) => promise.tryFailure(exception)
case Success(value) => promise.trySuccess(Option(value))
}
promise.future
}
一些测试代码
private def runFor(first: Int, second: Int) = {
def run(millis: Int) = Future {
Thread.sleep(millis);
millis
}
val start = System.currentTimeMillis()
val combined = for {
_ <- Future.unit
f1 = run(first)
f2 = completeOnMain(f1, run(second))
r1 <- f1
r2 <- f2
} yield (r1, r2)
val result = Await.result(combined, 10.seconds)
println(s"It took: ${System.currentTimeMillis() - start}: $result")
}
runFor(3000, 4000)
runFor(3000, 1000)
产生
It took: 3131: (3000,None)
It took: 3001: (3000,Some(1000))
这种任务很难用 Scala 标准库 Futures 高效、可靠、安全地完成。没有办法中断一个还没有完成的Future
,这意味着即使你选择忽略它的结果,它仍然会保留运行并浪费内存和CPU时间。即使有中断运行Future
的方法,也无法确保分配的资源(网络连接、打开的文件等)将被正确释放。
我想指出 Ivan Stanislavciuc 给出的实现有一个错误:如果 main
Future 失败,那么承诺将永远不会完成,这不太可能是你想要的。
因此,我强烈建议研究 ZIO 或 cats-effect 等现代并发效果系统。这些不仅更安全、更快,而且更容易。这是没有此错误的 ZIO 实现:
import zio.{Exit, Task}
import Function.tupled
def completeOnMain[A, B](
main: Task[A], secondary: Task[B]): Task[(A, Exit[Throwable, B])] =
(main.forkManaged zip secondary.forkManaged).use {
tupled(_.join zip _.interrupt)
}
Exit
是描述 secondary
任务如何结束的类型,即。 e.通过成功返回 B
或由于错误(类型 Throwable
)或由于中断。
请注意,可以为该函数提供更复杂的签名,告诉您更多关于正在发生的事情,但我想在这里保持简单。
假设我有三个远程调用来构建我的页面。其中一个 (X) 对页面至关重要,另外两个 (A, B) 仅用于增强体验。
因为criticalFutureX
太重要了,不会受到futureA
和futureB
的影响,所以我希望所有远程调用的整体延迟不超过X。
这意味着,如果 criticalFutureX
完成,我想丢弃 futureA
和 futureB
。
val criticalFutureX = ...
val futureA = ...
val futureB = ...
// the overall latency of this for-comprehension depends on the longest among X, A and B
for {
x <- criticalFutureX
a <- futureA
b <- futureB
} ...
在上面的例子中,虽然它们是并行执行的,但整体延迟取决于X、A、B中最长的,这不是我想要的。
Latencies:
X: |----------|
A: |---------------|
B: |---|
O: |---------------| (overall latency)
有firstCompletedOf但不能用来显式说"in case of completed of criticalFutureX"。
有没有类似下面的东西?
val criticalFutureX = ...
val futureA = ...
val futureB = ...
for {
x <- criticalFutureX
a <- futureA // discard when criticalFutureX finished
b <- futureB // discard when criticalFutureX finished
} ...
X: |----------|
A: |-----------... discarded
B: |---|
O: |----------| (overall latency)
你可以通过承诺实现这一目标
def completeOnMain[A, B](main: Future[A], secondary: Future[B]) = {
val promise = Promise[Option[B]]()
main.onComplete {
case Failure(_) =>
case Success(_) => promise.trySuccess(None)
}
secondary.onComplete {
case Failure(exception) => promise.tryFailure(exception)
case Success(value) => promise.trySuccess(Option(value))
}
promise.future
}
一些测试代码
private def runFor(first: Int, second: Int) = {
def run(millis: Int) = Future {
Thread.sleep(millis);
millis
}
val start = System.currentTimeMillis()
val combined = for {
_ <- Future.unit
f1 = run(first)
f2 = completeOnMain(f1, run(second))
r1 <- f1
r2 <- f2
} yield (r1, r2)
val result = Await.result(combined, 10.seconds)
println(s"It took: ${System.currentTimeMillis() - start}: $result")
}
runFor(3000, 4000)
runFor(3000, 1000)
产生
It took: 3131: (3000,None)
It took: 3001: (3000,Some(1000))
这种任务很难用 Scala 标准库 Futures 高效、可靠、安全地完成。没有办法中断一个还没有完成的Future
,这意味着即使你选择忽略它的结果,它仍然会保留运行并浪费内存和CPU时间。即使有中断运行Future
的方法,也无法确保分配的资源(网络连接、打开的文件等)将被正确释放。
我想指出 Ivan Stanislavciuc 给出的实现有一个错误:如果 main
Future 失败,那么承诺将永远不会完成,这不太可能是你想要的。
因此,我强烈建议研究 ZIO 或 cats-effect 等现代并发效果系统。这些不仅更安全、更快,而且更容易。这是没有此错误的 ZIO 实现:
import zio.{Exit, Task}
import Function.tupled
def completeOnMain[A, B](
main: Task[A], secondary: Task[B]): Task[(A, Exit[Throwable, B])] =
(main.forkManaged zip secondary.forkManaged).use {
tupled(_.join zip _.interrupt)
}
Exit
是描述 secondary
任务如何结束的类型,即。 e.通过成功返回 B
或由于错误(类型 Throwable
)或由于中断。
请注意,可以为该函数提供更复杂的签名,告诉您更多关于正在发生的事情,但我想在这里保持简单。