Scala Futures:非确定性输出

Scala Futures: Non deterministic output

我是 Scala 的新手,我正在通过创建一些重试方案来练习 Futures 库。这样做我得到了以下代码:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def fCalc(): Future[Int] = Future(calc())

  resetRetries()

  val ff = fCalc() // 0 - should fail
    .fallbackTo(fCalc()) // 1 - should fail
    .fallbackTo(fCalc()) // 2 - should fail
    .fallbackTo(fCalc()) // 3 - should fail
    .fallbackTo(fCalc()) // 4 - should be a success

  Await.ready(ff, 10.second)

  println(ff.isCompleted)
  println(ff.value)
}

每次我 运行 这段代码都会得到不同的结果。我得到的结果示例如下

输出 1

I am thread 12 This is going to fail. Retry count 1
I am thread 14 This is going to fail. Retry count 3
I am thread 13 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

输出 2

I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 13 This is going to fail. Retry count 3
I am thread 14 This is going to fail. Retry count 4
true
Some(Success(10))

输出 3

I am thread 12 This is going to fail. Retry count 1
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 12 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

结果并不总是在成功和失败之间交替出现。在成功的出现之前,可能有不止几次失败 运行s。

根据我的理解,"I am thread x This is going to fail. Retry count x" 应该只有 4 个日志,它们应该是:

I am thread a This is going to fail. Retry count 1
I am thread b This is going to fail. Retry count 2
I am thread c This is going to fail. Retry count 3
I am thread d This is going to fail. Retry count 4

不一定按这个顺序——因为我不知道 Scala 线程模型是如何工作的——但你明白我的意思。尽管如此,我还是得到了这个我无法理解的不确定输出 with.So... 我的问题是:这个非确定性输出从何而来?

我想提一下,以下重试机制始终产生相同的结果:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def retry[T](op: => T)(retries: Int): Future[T] = Future(op) recoverWith { case _ if retries > 0 => retry(op)(retries - 1) }

  resetRetries()
  val retriableFuture: Future[Future[Int]] = retry(calc())(5)
  Await.ready(retriableFuture, 10 second)

  println(retriableFuture.isCompleted)
  println(retriableFuture.value)
}

输出

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Success(10))

虽然如果我减少重试次数 (retry(calc())(3)),结果是预期的失败

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

这不是 Scala 问题,而是更一般的多线程问题,值为 retries。您有多个线程在没有任何同步的情况下读取和写入此值,因此您无法预测每个线程何时 运行 或它将看到什么值。

貌似具体问题是你在测试retries,后来更新了。有可能所有四个线程都在更新值之前测试该值。在这种情况下,他们都会看到 0 并抛出错误。

解决办法是把retries变成AtomicInteger然后用getAndIncrement。这将自动检索值并递增它,因此每个线程都会看到适当的值。


更新以下评论: 另一个答案已经解释了为什么会同时启动多个线程,这里不再赘述。对于多个线程 运行 并行记录的顺序总是不确定的。

虽然技术上 @Tim 是正确的,但我认为他并没有真正回答这个问题。

我相信你困惑的真正根源是你对结构的误解:

f.fallbackTo(Future(calc()))

确实如此。它与

有何不同
f.recoverWith({ case _ => Future(calc())})

有两个重要的区别:

  1. fallbackTo 的情况下,Future(calc()) 立即创建,因此(几乎)立即开始执行 calc()。因此原始 future 和 fallback future 同时 运行。在 recoverWith 的情况下,仅在原始 future 失败后才创建回退 future。这种差异会影响日志记录顺序。这也意味着对 var retries 的访问是并发的,因此您可能会看到所有线程实际上都失败的情况,因为对 retries 的一些更新丢失了。

  2. 另一个棘手的点是fallbackTodocumented因为(突出显示是我的)

Creates a new future which holds the result of this future if it was completed successfully, or, if not, the result of the that future if that is completed successfully. If both futures are failed, the resulting future holds the throwable object of the first future.

这种差异不会真正影响您的示例,因为您在所有失败尝试中抛出的异常都是相同的,但如果它们不同,则可能会影响结果。例如,如果您将代码修改为:

  def calc(attempt: Int) = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException(s"This failed $attempt")
  }

  def fCalc(attempt: Int): Future[Int] = Future(calc(attempt))

  val ff = fCalc(1) // 0 - should fail
      .fallbackTo(fCalc(2)) // 1 - should fail
      .fallbackTo(fCalc(3)) // 2 - should fail
      .fallbackTo(fCalc(4)) // 3 - should fail
      .fallbackTo(fCalc(5)) // 4 - should be a success

那么你应该得到这两个结果中的一个

Some(Failure(java.lang.IllegalArgumentException: This failed 1))
Some(Success(10))

从来没有任何其他 "failed" 值。

请注意,我在这里明确传递 attempt 以不在 retries 上达到竞争条件。


对更多评论的回答(1 月 28 日)

我在前面的示例中明确传递 attempt 的原因是,这是确保逻辑上第一个 calc 创建的 IllegalArgumentException 将获得 [=27] 的最简单方法=] 作为它在所有(甚至不是很现实的)线程调度下的价值。

如果您只想让所有日志都具有不同的值,那么有一种更简单的方法:使用局部变量!

  def calc() = {
    val retries = atomicRetries.getAndIncrement()
    if (retries > 3) 10 
    else {
      println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
      throw new IllegalArgumentException(s"This failed $retries")
    }
  }

这样就避免了经典的 TOCTOU 问题。

这就是最终对我有用的方法:

calc() 方法的以下代码充分解决了有关日志记录重复和期货的非确定性结果的问题)

var time = 0
  var resetTries = time = 0

  def calc() = this.synchronized {
    if (time > 3) 10 else {
      time += 1
      println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $time") // For debugging purposes
      throw new IllegalStateException(("not yet"))
    }
  }

不需要 AtomicInteger - 在我看来让事情变得更加复杂。需要一个 synchronised 包装器。

我必须强调一个事实,即这只是为了演示目的,在生产代码中使用这样的设计可能不是最好的主意(阻止调用 calc 方法)。应该改用 recoverWith 实现。

感谢@SergGr、@Tim 和@MichalPolitowksi 的帮助