不完全理解 Source.delay 方法或者 akka-stream 文档中有错误

Don't completely understand Source.delay method or there is error in akka-stream docs

我正在阅读关于 KillSwitch 的 akka-stream 文档,他们有一个示例来说明 KillSwitch.shutdown 方法:

val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()

doSomethingElse()

killSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2

我无法理解为什么预期结果应该是 2。 正如我看到的这个例子,流被延迟 1 秒。当它暂停时, shutdown() 被调用,因此终止开关告诉流在延迟完成之前关闭。我不明白为什么要发出流的前 2 个元素并将其传送到接收器。

能帮忙解释一下吗?

注意:如果我 运行 这个例子,我得到了我预期的以下异常:

Exception in thread "main" java.util.NoSuchElementException: last of empty stream
    at akka.stream.scaladsl.Sink$.$anonfun$last(Sink.scala:181)

对示例代码有误解。结果完全取决于 doSomethingElse 的运行时长。仅当花费的时间太少时才会出现异常。要对此进行测试,您可以将其替换为 Thread.sleep(2000),然后您将从 Sink 返回结果。如果您增加睡眠值,结果也会增加。

关于评论中的问题:

delay 按指定的量及时移动元素发射。延时精度为10ms,避免不必要的定时器调度周期。这就是您看到此行为的原因(您可以在 Flow 的 Scala 文档中查看这些详细信息)。

如果您想每秒发送一条消息,请尝试 throttle

.throttle(1, 1.second, 1, ThrottleMode.shaping)