有没有办法在抛出异常时获取被映射的项目 akka 流?

Is there a way to get the item being mapped when an exception is thrown akka streams?

如果抛出异常,我希望能够记录被映射项目的某些属性,所以我想知道有没有办法在抛出异常时获取被映射的项目 akka 流?

如果我有:

val decider: Supervision.Decider = { e =>
//val item = getItemThatCausedException
  logger.error("Exception in stream with itemId:"+item.id, e)
  Supervision.Resume
}

implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

Source(List(item1,item2,item3)).map { item =>
  if (item.property < 0) {
    throw new RuntimeException("Error")
  } else {
    i
  }
}

有没有办法在 Supervision.Decider 或地图完成后获取失败的项目?

不是 Supervision.Decide 但你可以用不同的方式实现它。

查看此程序:

object Streams extends App{

  implicit val system = ActorSystem("test")

  implicit val mat = ActorMaterializer()

  val source = Source(List("1", "2", "3")).map { item =>
    Try {
      if (item == "2") {
        throw new RuntimeException("Error")
      } else {
        item
      }
    }
  }
  source
    .alsoTo(
      Flow[Try[String]]
        .filter(_.isFailure)
        .to(Sink.foreach(t => println("failure: " + t))))
    .to(
      Flow[Try[String]]
        .filter(_.isSuccess)
        .to(Sink.foreach(t => println("success " + t)))).run()

}

输出:

success Success(1)
failure: Failure(java.lang.RuntimeException: Error)
success Success(3)

您可以使用 Supervision.Decider 来记录这些属性。

object Test extends App {

  implicit val system = ActorSystem("test")

  implicit val mat = ActorMaterializer()

  val testSupervisionDecider: Supervision.Decider = {
    case ex: RuntimeException =>
      println(s"some run time exception ${ex.getMessage}")
      Supervision.Resume
    case ex: Exception =>
     //if you want to stop the stream
   Supervision.Stop
  }

  val source = Source(List("1", "2", "3")).map { item =>
    if (item == "2") {
      throw new RuntimeException(s"$item")
    } else {
      item
    }
  }

  source
    .to(Sink.foreach(println(_)))
    .withAttributes(ActorAttributes.supervisionStrategy(testSupervisionDecider))
    .run

}

输出是:

1
some run time exception 2
3

这有点令人费解,但您可以通过将映射函数包装在流中并使用 flatMapConcat 来做到这一点,如下所示:

Source(List(item1, item2, item3)).flatMapConcat { item =>
  Source(List(item))
    .map(mapF)
    .withAttributes(ActorAttributes.supervisionStrategy { e: Throwable =>
      logger.error("Exception in stream with itemId:" + item.id, e)
      Supervision.Resume
    })
}

def mapF(item: Item) =
  if (item.property < 0) {
    throw new RuntimeException("Error")
  } else {
    i
  }

这是可能的,因为每个流阶段都可以有自己的监督策略。