Source.queue 出现意外行为

Source.queue giving unexpected behavior

无论我为 Source.queue 的 bufferSize 和 overflowStrategy 参数提供什么参数,结果总是类似于底部的输出。我期待看到报价调用和报价结果或多或少立即完成,并能够看到不同的处理和基于 bufferSize 和 overflowStrategy 的报价结果消息。我在这里做错了什么?


代码:

  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("scratch")
    implicit val materializer: ActorMaterializer = ActorMaterializer()
    implicit val executionContext: ExecutionContextExecutor = system.dispatcher

    val start = Instant.now()
    def elapsed = time.Duration.between(start, Instant.now()).toMillis
    val intSource = Source.queue[Int](2, OverflowStrategy.dropHead)
    val intSink = Sink foreach { ii: Int =>
      Thread.sleep(1000)
      println(s"processing $ii at $elapsed")
    }
    val intChannel = intSource.to(intSink).run()
    (1 to 4) map { ii =>
      println(s"offer invocation for $ii at $elapsed")
      (ii, intChannel.offer(ii))
    } foreach { intFutureOfferResultPair =>
      val (ii, futureOfferResult) = intFutureOfferResultPair
      futureOfferResult onComplete { offerResult =>
        println(s"offer result for $ii: $offerResult at $elapsed")
      }
    }
    intChannel.complete()

    intChannel.watchCompletion.onComplete { _ => system.terminate() }
  }

输出:

offer invocation for 1 at 72
offer invocation for 2 at 77
offer invocation for 3 at 77
offer invocation for 4 at 77
offer result for 1: Success(Enqueued) at 90
processing 1 at 1084
offer result for 2: Success(Enqueued) at 1084
processing 2 at 2084
offer result for 3: Success(Enqueued) at 2084
processing 3 at 3084
offer result for 4: Success(Enqueued) at 3084
processing 4 at 4084

我可以通过替换获得预期的行为:

val intChannel = intSource.to(intSink).run()

与:

val (intChannel, futureDone) = intSource.async.toMat(intSink)(Keep.both).run()

和:

intChannel.watchCompletion.onComplete { _ => system.terminate() }

与:

futureDone.onComplete { _ => system.terminate() }

固定代码:

  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("scratch")
    implicit val materializer: ActorMaterializer = ActorMaterializer()
    implicit val executionContext: ExecutionContextExecutor = system.dispatcher

    val start = Instant.now()
    def elapsed = time.Duration.between(start, Instant.now()).toMillis
    val intSource = Source.queue[Int](2, OverflowStrategy.dropHead)
    val intSink = Sink foreach { ii: Int =>
      Thread.sleep(1000)
      println(s"processing $ii at $elapsed")
    }
    val (intChannel, futureDone) = intSource.async.toMat(intSink)(Keep.both).run()
    (1 to 4) map { ii =>
      println(s"offer invocation for $ii at $elapsed")
      (ii, intChannel.offer(ii))
    } foreach { intFutureOfferResultPair =>
      val (ii, futureOfferResult) = intFutureOfferResultPair
      futureOfferResult onComplete { offerResult =>
        println(s"offer result for $ii: $offerResult at $elapsed")
      }
    }
    intChannel.complete()

    futureDone.onComplete { _ => system.terminate() }
  }

输出

offer invocation for 1 at 84
offer invocation for 2 at 89
offer invocation for 3 at 89
offer invocation for 4 at 89
offer result for 3: Success(Enqueued) at 110
offer result for 4: Success(Enqueued) at 110
offer result for 1: Success(Enqueued) at 110
offer result for 2: Success(Enqueued) at 110
processing 3 at 1102
processing 4 at 2102