为什么这个Akka Streams应用会异常退出?

Why does this Akka Streams application exit abnormally?

我正在通过 Packt 的 Akka Cookbook 模块学习 Akka Streams。我 运行 TransformingStreamsApplication.scala 示例并得到这个:

为了让actor系统在流处理完成后退出,我添加了以下回调:

// Future[IOResult]
val future = stream.run()
future.onComplete(_ => system.terminate())

但是这次应用程序直接退出,没有任何控制台输出:

我想到的解决方法是添加 Thread.sleep(10000):

我想要对此行为的解释。

这是您referencing的信息流:

val stream = FileIO.fromPath(path)
  ...
  .to(Sink.foreach(println))

由于使用了 to 连接器,在上述流上调用 run() returns 源的物化值,在本例中是 Future[IOResult].发生的情况是您在流元素到达接收器之前终止了 actor 系统。

不是添加 Thread.sleep,而是使用 toMatKeep.right 更改流以产生接收器的物化值。这个物化值也是一个 Future,你可以在这个 Future 完成后终止 actor 系统:

val stream = FileIO.fromPath(path)
  ...
  .toMat(Sink.foreach(println))(Keep.right)

val future = stream.run()
future.onComplete(_ => system.terminate())

请注意,有一个 shorthand 方法,称为 runWith:

val stream = FileIO.fromPath(path)
  ...
  .runWith(Sink.foreach(println))

stream.onComplete(_ => system.terminate())