如果关键的 Future 在 Scala 中完成,如何丢弃其他 Futures?

How to discard other Futures if the critical Future is finished in Scala?

假设我有三个远程调用来构建我的页面。其中一个 (X) 对页面至关重要,另外两个 (A, B) 仅用于增强体验。

因为criticalFutureX太重要了,不会受到futureAfutureB的影响,所以我希望所有远程调用的整体延迟不超过X。

这意味着,如果 criticalFutureX 完成,我想丢弃 futureAfutureB

val criticalFutureX = ...
val futureA = ...
val futureB = ...

// the overall latency of this for-comprehension depends on the longest among X, A and B
for {
  x <- criticalFutureX
  a <- futureA
  b <- futureB
} ...

在上面的例子中,虽然它们是并行执行的,但整体延迟取决于X、A、B中最长的,这不是我想要的。

Latencies:
X: |----------|
A: |---------------|
B: |---|

O: |---------------| (overall latency)

firstCompletedOf但不能用来显式说"in case of completed of criticalFutureX"。

有没有类似下面的东西?

val criticalFutureX = ...
val futureA = ...
val futureB = ...

for {
  x <- criticalFutureX
  a <- futureA // discard when criticalFutureX finished
  b <- futureB // discard when criticalFutureX finished
} ...

X: |----------|
A: |-----------... discarded
B: |---|

O: |----------| (overall latency)

你可以通过承诺实现这一目标


  def completeOnMain[A, B](main: Future[A], secondary: Future[B]) = {
    val promise = Promise[Option[B]]()
    main.onComplete {
      case Failure(_) =>
      case Success(_) => promise.trySuccess(None)
    }
    secondary.onComplete {
      case Failure(exception) => promise.tryFailure(exception)
      case Success(value)     => promise.trySuccess(Option(value))
    }
    promise.future
  }

一些测试代码

  private def runFor(first: Int, second: Int) = {

    def run(millis: Int) = Future {
      Thread.sleep(millis);
      millis
    }

    val start = System.currentTimeMillis()
    val combined = for {
      _ <- Future.unit
      f1 = run(first)
      f2 = completeOnMain(f1, run(second))
      r1 <- f1
      r2 <- f2
    } yield (r1, r2)

    val result = Await.result(combined, 10.seconds)
    println(s"It took: ${System.currentTimeMillis() - start}: $result")
  }

  runFor(3000, 4000)
  runFor(3000, 1000)

产生

It took: 3131: (3000,None)
It took: 3001: (3000,Some(1000))

这种任务很难用 Scala 标准库 Futures 高效、可靠、安全地完成。没有办法中断一个还没有完成的Future,这意味着即使你选择忽略它的结果,它仍然会保留运行并浪费内存和CPU时间。即使有中断运行Future的方法,也无法确保分配的资源(网络连接、打开的文件等)将被正确释放。

我想指出 Ivan Stanislavciuc 给出的实现有一个错误:如果 main Future 失败,那么承诺将永远不会完成,这不太可能是你想要的。

因此,我强烈建议研究 ZIO 或 cats-effect 等现代并发效果系统。这些不仅更安全、更快,而且更容易。这是没有此错误的 ZIO 实现:

import zio.{Exit, Task}
import Function.tupled

def completeOnMain[A, B](
  main: Task[A], secondary: Task[B]): Task[(A, Exit[Throwable, B])] =
  (main.forkManaged zip secondary.forkManaged).use {
    tupled(_.join zip _.interrupt)
  }

Exit 是描述 secondary 任务如何结束的类型,即。 e.通过成功返回 B 或由于错误(类型 Throwable)或由于中断。

请注意,可以为该函数提供更复杂的签名,告诉您更多关于正在发生的事情,但我想在这里保持简单。