在 Akka Streams 的非线性图上恢复后如何工作监督策略
How to work Supervision strategy after recover at non-linear graphs in Akka Streams
我使用 recover 方法来捕获 Akka Streams 中的错误或异常。它适用于线性图,但不适用于非线性图(例如 Broadcast、Zip)。
之所以带有fan-in或fan-out的Graph永远等待失败端口,所以Akka Streams挂掉了。
该解决方案在 https://blog.softwaremill.com/akka-streams-pitfalls-to-avoid-part-2-f93e60746c58.
的第 9 节中进行了描述
post 在 Flow 中使用 Try monad 和 catch Exception。这样可行。但是我使用恢复方法,因为我有很多流程,我想在一个地方捕获错误。
我准备了下面的例子,但是没用...
Source(1 to 10)
.via(graph)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)
private def dangerFlow: Flow[Int, Try[String], NotUsed] = {
Flow[Int].map(a => if (a == 5) throw new Exception("5 is invalid") else a.toString).map(str => Try(str)).recover {
case e => Failure[String](e)
}
}
private def safeFlow: Flow[Int, String, NotUsed] = Flow[Int].map( "hello" +_)
def graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Int](2))
val zip = b.add(Zip[Try[String], String])
bcast.out(0) ~> dangerFlow ~> zip.in0
bcast.out(1) ~> safeFlow ~> zip.in1
FlowShape(bcast.in, zip.out)
})
结果:
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
我预计:
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
(Failure(java.lang.Exception: 5 is invalid),hello5)
(Success(6),hello6)
(Success(7),hello7)
(Success(8),hello8)
(Success(9),hello9)
(Success(10),hello10)
请告诉我任何解决方案。谢谢。
首先,让我们添加几个打印语句以更清楚地看到发生了什么:一个在流完成时...
val stream =
Source(1 to 10)
.via(graph)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)
// ...
stream.onComplete { _ =>
println("Done!") // <---
system.terminate()
}
...recover
块中的另一个:
private def dangerFlow: Flow[Int, Try[String], NotUsed] = {
Flow[Int]
.map(a => if (a == 5) throw new Exception("5 is invalid") else a.toString)
.map(str => Try(str))
.recover {
case e =>
println("Recovering...") // <---
Failure[String](e)
}
}
运行 流的输出是...
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
// no "Recovering..." or "Done!"
...显示未调用 recover
方法并且流从未完成。流死锁的原因与 blog 描述的相同:
[dangerFlow
] fails and does not emit element to Zip
. It then resumes demanding next element from broadcast
. However, for broadcast
to emit element the demand must be signaled from all outputs.
Zip
receives only one element (from safeFlow
) and waits forever for the second element. Zip
emits only when both inputs have value.
恢复监管策略是recover
没有被调用的原因。删除该策略...
val stream =
Source(1 to 10)
.via(graph)
//.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)
...和 运行 流再次产生以下输出:
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
Recovering...
(Failure(java.lang.Exception: 5 is invalid),hello5)
Done!
现在调用 recover
,流完成,但流被截断。这是因为 recover
完成流:
recover
allows you to emit a final element and then complete the stream on an upstream failure.
要获得所需的行为,您必须按如下方式使用 Try
:
private def dangerFlow: Flow[Int, Try[String], NotUsed] = {
Flow[Int].map(a => if (a == 5) Failure(new Exception("5 is invalid")) else Try(a.toString))
}
运行 带有上述 Flow
的流产生以下内容:
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
(Failure(java.lang.Exception: 5 is invalid),hello5)
(Success(6),hello6)
(Success(7),hello7)
(Success(8),hello8)
(Success(9),hello9)
(Success(10),hello10)
Done!
我使用 recover 方法来捕获 Akka Streams 中的错误或异常。它适用于线性图,但不适用于非线性图(例如 Broadcast、Zip)。 之所以带有fan-in或fan-out的Graph永远等待失败端口,所以Akka Streams挂掉了。 该解决方案在 https://blog.softwaremill.com/akka-streams-pitfalls-to-avoid-part-2-f93e60746c58.
的第 9 节中进行了描述post 在 Flow 中使用 Try monad 和 catch Exception。这样可行。但是我使用恢复方法,因为我有很多流程,我想在一个地方捕获错误。
我准备了下面的例子,但是没用...
Source(1 to 10)
.via(graph)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)
private def dangerFlow: Flow[Int, Try[String], NotUsed] = {
Flow[Int].map(a => if (a == 5) throw new Exception("5 is invalid") else a.toString).map(str => Try(str)).recover {
case e => Failure[String](e)
}
}
private def safeFlow: Flow[Int, String, NotUsed] = Flow[Int].map( "hello" +_)
def graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Int](2))
val zip = b.add(Zip[Try[String], String])
bcast.out(0) ~> dangerFlow ~> zip.in0
bcast.out(1) ~> safeFlow ~> zip.in1
FlowShape(bcast.in, zip.out)
})
结果:
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
我预计:
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
(Failure(java.lang.Exception: 5 is invalid),hello5)
(Success(6),hello6)
(Success(7),hello7)
(Success(8),hello8)
(Success(9),hello9)
(Success(10),hello10)
请告诉我任何解决方案。谢谢。
首先,让我们添加几个打印语句以更清楚地看到发生了什么:一个在流完成时...
val stream =
Source(1 to 10)
.via(graph)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)
// ...
stream.onComplete { _ =>
println("Done!") // <---
system.terminate()
}
...recover
块中的另一个:
private def dangerFlow: Flow[Int, Try[String], NotUsed] = {
Flow[Int]
.map(a => if (a == 5) throw new Exception("5 is invalid") else a.toString)
.map(str => Try(str))
.recover {
case e =>
println("Recovering...") // <---
Failure[String](e)
}
}
运行 流的输出是...
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
// no "Recovering..." or "Done!"
...显示未调用 recover
方法并且流从未完成。流死锁的原因与 blog 描述的相同:
[
dangerFlow
] fails and does not emit element toZip
. It then resumes demanding next element frombroadcast
. However, forbroadcast
to emit element the demand must be signaled from all outputs.
Zip
receives only one element (fromsafeFlow
) and waits forever for the second element.Zip
emits only when both inputs have value.
恢复监管策略是recover
没有被调用的原因。删除该策略...
val stream =
Source(1 to 10)
.via(graph)
//.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)
...和 运行 流再次产生以下输出:
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
Recovering...
(Failure(java.lang.Exception: 5 is invalid),hello5)
Done!
现在调用 recover
,流完成,但流被截断。这是因为 recover
完成流:
recover
allows you to emit a final element and then complete the stream on an upstream failure.
要获得所需的行为,您必须按如下方式使用 Try
:
private def dangerFlow: Flow[Int, Try[String], NotUsed] = {
Flow[Int].map(a => if (a == 5) Failure(new Exception("5 is invalid")) else Try(a.toString))
}
运行 带有上述 Flow
的流产生以下内容:
(Success(1),hello1)
(Success(2),hello2)
(Success(3),hello3)
(Success(4),hello4)
(Failure(java.lang.Exception: 5 is invalid),hello5)
(Success(6),hello6)
(Success(7),hello7)
(Success(8),hello8)
(Success(9),hello9)
(Success(10),hello10)
Done!