如果 Source 内部发生错误,为什么流不继续

Why doesn't the stream continue if the error occurs inside the Source

我写了一个 Scala 流应用程序。我的 objective 是如果在流处理过程中某个项目在处理过程中遇到错误,那么流将忽略它并继续处理剩余的项目。

为了这个目标,我写了这段代码

object StreamRestart extends App {
   implicit val actorSystem = ActorSystem()
   implicit val ec : ExecutionContext = actorSystem.dispatcher
   var failed: ListBuffer[Int] = ListBuffer()
   val decider : Supervision.Decider = {
      case x : Exception => {
         println(s"failed with error $x")
         failed += x.getMessage.toInt
         Supervision.Restart
      }
   }
   implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
   def source : Source[Int, NotUsed] = {
      val iter  = (1 to 1000 toStream) map {x => if (x == 600) throw new Exception(x.toString) else x}
      Source(iter)
   }

   val sink : Sink[Int, Future[Done]] = Sink.foreach[Int]{i => println(i)}
   val future : Future[Done] = source.runWith(sink)
   val f = future.map{_ =>
      println("completed")
      actorSystem.terminate()
   }
   Await.result(f, Duration.Inf)
   println(s"who failed ${failed.toList}")
}

即使我的决策者说 "Restart",上面的代码也会在 600 处崩溃。如果我将此异常端移动到 "flow",则决策程序开始工作并且流处理直到达到 1000。但是如果错误发生在源函数内部,应用程序将崩溃。

有没有办法让我的应用程序万无一失,始终到达终点。否则我们如何在源函数中发生错误时从错误中恢复。

源中的错误意味着该源实例出现不可恢复的故障,这意味着流无法继续使用该源,但您需要切换到其他源:如果您正在收听一个 websocket 和 http 服务器出现故障,您将无法在该 websocket 上再次收听。

Akka 流提供 recoverWithRetries 如文档中所述,以移动到另一个源,或者更一般地说,用具有相同形状的流元素替换流的一部分。

我强烈建议阅读有关 handling errors in akka streams 的文档。要从故障中恢复,您可以使用名为“recover”的运算符或类似“Restart*”的运算符 sources/sinks/flows.

重启阶段是最通用的,也是最强大的,所以看看吧。监督也存在,它会自动处理这些事情,但它必须得到给定操作员的支持。 (大多数内置运算符都这样做,但不是全部,请查看他们的文档)