如果 Source 内部发生错误,为什么流不继续
Why doesn't the stream continue if the error occurs inside the Source
我写了一个 Scala 流应用程序。我的 objective 是如果在流处理过程中某个项目在处理过程中遇到错误,那么流将忽略它并继续处理剩余的项目。
为了这个目标,我写了这段代码
object StreamRestart extends App {
implicit val actorSystem = ActorSystem()
implicit val ec : ExecutionContext = actorSystem.dispatcher
var failed: ListBuffer[Int] = ListBuffer()
val decider : Supervision.Decider = {
case x : Exception => {
println(s"failed with error $x")
failed += x.getMessage.toInt
Supervision.Restart
}
}
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
def source : Source[Int, NotUsed] = {
val iter = (1 to 1000 toStream) map {x => if (x == 600) throw new Exception(x.toString) else x}
Source(iter)
}
val sink : Sink[Int, Future[Done]] = Sink.foreach[Int]{i => println(i)}
val future : Future[Done] = source.runWith(sink)
val f = future.map{_ =>
println("completed")
actorSystem.terminate()
}
Await.result(f, Duration.Inf)
println(s"who failed ${failed.toList}")
}
即使我的决策者说 "Restart",上面的代码也会在 600 处崩溃。如果我将此异常端移动到 "flow",则决策程序开始工作并且流处理直到达到 1000。但是如果错误发生在源函数内部,应用程序将崩溃。
有没有办法让我的应用程序万无一失,始终到达终点。否则我们如何在源函数中发生错误时从错误中恢复。
源中的错误意味着该源实例出现不可恢复的故障,这意味着流无法继续使用该源,但您需要切换到其他源:如果您正在收听一个 websocket 和 http 服务器出现故障,您将无法在该 websocket 上再次收听。
Akka 流提供 recoverWithRetries
如文档中所述,以移动到另一个源,或者更一般地说,用具有相同形状的流元素替换流的一部分。
我强烈建议阅读有关 handling errors in akka streams 的文档。要从故障中恢复,您可以使用名为“recover”的运算符或类似“Restart*”的运算符 sources/sinks/flows.
重启阶段是最通用的,也是最强大的,所以看看吧。监督也存在,它会自动处理这些事情,但它必须得到给定操作员的支持。 (大多数内置运算符都这样做,但不是全部,请查看他们的文档)
我写了一个 Scala 流应用程序。我的 objective 是如果在流处理过程中某个项目在处理过程中遇到错误,那么流将忽略它并继续处理剩余的项目。
为了这个目标,我写了这段代码
object StreamRestart extends App {
implicit val actorSystem = ActorSystem()
implicit val ec : ExecutionContext = actorSystem.dispatcher
var failed: ListBuffer[Int] = ListBuffer()
val decider : Supervision.Decider = {
case x : Exception => {
println(s"failed with error $x")
failed += x.getMessage.toInt
Supervision.Restart
}
}
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
def source : Source[Int, NotUsed] = {
val iter = (1 to 1000 toStream) map {x => if (x == 600) throw new Exception(x.toString) else x}
Source(iter)
}
val sink : Sink[Int, Future[Done]] = Sink.foreach[Int]{i => println(i)}
val future : Future[Done] = source.runWith(sink)
val f = future.map{_ =>
println("completed")
actorSystem.terminate()
}
Await.result(f, Duration.Inf)
println(s"who failed ${failed.toList}")
}
即使我的决策者说 "Restart",上面的代码也会在 600 处崩溃。如果我将此异常端移动到 "flow",则决策程序开始工作并且流处理直到达到 1000。但是如果错误发生在源函数内部,应用程序将崩溃。
有没有办法让我的应用程序万无一失,始终到达终点。否则我们如何在源函数中发生错误时从错误中恢复。
源中的错误意味着该源实例出现不可恢复的故障,这意味着流无法继续使用该源,但您需要切换到其他源:如果您正在收听一个 websocket 和 http 服务器出现故障,您将无法在该 websocket 上再次收听。
Akka 流提供 recoverWithRetries
如文档中所述,以移动到另一个源,或者更一般地说,用具有相同形状的流元素替换流的一部分。
我强烈建议阅读有关 handling errors in akka streams 的文档。要从故障中恢复,您可以使用名为“recover”的运算符或类似“Restart*”的运算符 sources/sinks/flows.
重启阶段是最通用的,也是最强大的,所以看看吧。监督也存在,它会自动处理这些事情,但它必须得到给定操作员的支持。 (大多数内置运算符都这样做,但不是全部,请查看他们的文档)