Akka Streams 处理来自第三方 GraphStage 的异常
Akka Streams handling an Exception from a third party GraphStage
您好,我想知道是否有人可以帮助我理解第三方创建的 GraphStage 的异常处理。
我正在使用抛出异常并停止流的 GraphStage。我想做的是记录有关错误的所有信息(在失败之前传递到 GraphStage 的所有信息)并继续处理。
我试过 recover 和 supervision strategy 但它们不允许流继续。
这是一个演示我的问题的示例 GraphStage。我明确抛出异常,这可能不是最佳做法。
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
class TestStage extends GraphStage[FlowShape[Int, Int]] {
private val in = Inlet[Int]("Test.in")
private val out = Outlet[Int]("Test.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush(): Unit = {
val num = grab(in)
if (num == 5) {
throw new Exception(s"Number is 5")
}
push(out, num)
}
override def onPull(): Unit = pull(in)
}
}
Source(1 to 10)
.via(Flow.fromGraph(new TestStage))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.recover {
case e: Exception => e.getMessage
}
.runForeach(println)
这个不使用 GraphStage 的示例会继续处理。所以这似乎是一个
从 GraphStage 抛出的异常需要区别对待吗?
Source(1 to 10)
.map {
case 5 => throw new Exception("5 is bad")
case n => n
}
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)
感谢您的帮助
看起来这是一些 akka 流阶段的记录行为
The operators that support supervision strategies are explicitly documented to do so, if there is nothing in the documentation of an operator saying that it adheres to the supervision strategy it means it fails rather than applies supervision.
或者这个issue.
您好,我想知道是否有人可以帮助我理解第三方创建的 GraphStage 的异常处理。
我正在使用抛出异常并停止流的 GraphStage。我想做的是记录有关错误的所有信息(在失败之前传递到 GraphStage 的所有信息)并继续处理。 我试过 recover 和 supervision strategy 但它们不允许流继续。
这是一个演示我的问题的示例 GraphStage。我明确抛出异常,这可能不是最佳做法。
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
class TestStage extends GraphStage[FlowShape[Int, Int]] {
private val in = Inlet[Int]("Test.in")
private val out = Outlet[Int]("Test.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush(): Unit = {
val num = grab(in)
if (num == 5) {
throw new Exception(s"Number is 5")
}
push(out, num)
}
override def onPull(): Unit = pull(in)
}
}
Source(1 to 10)
.via(Flow.fromGraph(new TestStage))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.recover {
case e: Exception => e.getMessage
}
.runForeach(println)
这个不使用 GraphStage 的示例会继续处理。所以这似乎是一个 从 GraphStage 抛出的异常需要区别对待吗?
Source(1 to 10)
.map {
case 5 => throw new Exception("5 is bad")
case n => n
}
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)
感谢您的帮助
看起来这是一些 akka 流阶段的记录行为
The operators that support supervision strategies are explicitly documented to do so, if there is nothing in the documentation of an operator saying that it adheres to the supervision strategy it means it fails rather than applies supervision.
或者这个issue.