有没有办法在抛出异常时获取被映射的项目 akka 流?
Is there a way to get the item being mapped when an exception is thrown akka streams?
如果抛出异常,我希望能够记录被映射项目的某些属性,所以我想知道有没有办法在抛出异常时获取被映射的项目 akka 流?
如果我有:
val decider: Supervision.Decider = { e =>
//val item = getItemThatCausedException
logger.error("Exception in stream with itemId:"+item.id, e)
Supervision.Resume
}
implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
Source(List(item1,item2,item3)).map { item =>
if (item.property < 0) {
throw new RuntimeException("Error")
} else {
i
}
}
有没有办法在 Supervision.Decider 或地图完成后获取失败的项目?
不是 Supervision.Decide
但你可以用不同的方式实现它。
查看此程序:
object Streams extends App{
implicit val system = ActorSystem("test")
implicit val mat = ActorMaterializer()
val source = Source(List("1", "2", "3")).map { item =>
Try {
if (item == "2") {
throw new RuntimeException("Error")
} else {
item
}
}
}
source
.alsoTo(
Flow[Try[String]]
.filter(_.isFailure)
.to(Sink.foreach(t => println("failure: " + t))))
.to(
Flow[Try[String]]
.filter(_.isSuccess)
.to(Sink.foreach(t => println("success " + t)))).run()
}
输出:
success Success(1)
failure: Failure(java.lang.RuntimeException: Error)
success Success(3)
您可以使用 Supervision.Decider
来记录这些属性。
object Test extends App {
implicit val system = ActorSystem("test")
implicit val mat = ActorMaterializer()
val testSupervisionDecider: Supervision.Decider = {
case ex: RuntimeException =>
println(s"some run time exception ${ex.getMessage}")
Supervision.Resume
case ex: Exception =>
//if you want to stop the stream
Supervision.Stop
}
val source = Source(List("1", "2", "3")).map { item =>
if (item == "2") {
throw new RuntimeException(s"$item")
} else {
item
}
}
source
.to(Sink.foreach(println(_)))
.withAttributes(ActorAttributes.supervisionStrategy(testSupervisionDecider))
.run
}
输出是:
1
some run time exception 2
3
这有点令人费解,但您可以通过将映射函数包装在流中并使用 flatMapConcat
来做到这一点,如下所示:
Source(List(item1, item2, item3)).flatMapConcat { item =>
Source(List(item))
.map(mapF)
.withAttributes(ActorAttributes.supervisionStrategy { e: Throwable =>
logger.error("Exception in stream with itemId:" + item.id, e)
Supervision.Resume
})
}
def mapF(item: Item) =
if (item.property < 0) {
throw new RuntimeException("Error")
} else {
i
}
这是可能的,因为每个流阶段都可以有自己的监督策略。
如果抛出异常,我希望能够记录被映射项目的某些属性,所以我想知道有没有办法在抛出异常时获取被映射的项目 akka 流?
如果我有:
val decider: Supervision.Decider = { e =>
//val item = getItemThatCausedException
logger.error("Exception in stream with itemId:"+item.id, e)
Supervision.Resume
}
implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
Source(List(item1,item2,item3)).map { item =>
if (item.property < 0) {
throw new RuntimeException("Error")
} else {
i
}
}
有没有办法在 Supervision.Decider 或地图完成后获取失败的项目?
不是 Supervision.Decide
但你可以用不同的方式实现它。
查看此程序:
object Streams extends App{
implicit val system = ActorSystem("test")
implicit val mat = ActorMaterializer()
val source = Source(List("1", "2", "3")).map { item =>
Try {
if (item == "2") {
throw new RuntimeException("Error")
} else {
item
}
}
}
source
.alsoTo(
Flow[Try[String]]
.filter(_.isFailure)
.to(Sink.foreach(t => println("failure: " + t))))
.to(
Flow[Try[String]]
.filter(_.isSuccess)
.to(Sink.foreach(t => println("success " + t)))).run()
}
输出:
success Success(1)
failure: Failure(java.lang.RuntimeException: Error)
success Success(3)
您可以使用 Supervision.Decider
来记录这些属性。
object Test extends App {
implicit val system = ActorSystem("test")
implicit val mat = ActorMaterializer()
val testSupervisionDecider: Supervision.Decider = {
case ex: RuntimeException =>
println(s"some run time exception ${ex.getMessage}")
Supervision.Resume
case ex: Exception =>
//if you want to stop the stream
Supervision.Stop
}
val source = Source(List("1", "2", "3")).map { item =>
if (item == "2") {
throw new RuntimeException(s"$item")
} else {
item
}
}
source
.to(Sink.foreach(println(_)))
.withAttributes(ActorAttributes.supervisionStrategy(testSupervisionDecider))
.run
}
输出是:
1
some run time exception 2
3
这有点令人费解,但您可以通过将映射函数包装在流中并使用 flatMapConcat
来做到这一点,如下所示:
Source(List(item1, item2, item3)).flatMapConcat { item =>
Source(List(item))
.map(mapF)
.withAttributes(ActorAttributes.supervisionStrategy { e: Throwable =>
logger.error("Exception in stream with itemId:" + item.id, e)
Supervision.Resume
})
}
def mapF(item: Item) =
if (item.property < 0) {
throw new RuntimeException("Error")
} else {
i
}
这是可能的,因为每个流阶段都可以有自己的监督策略。