如何在akka流中抛出异常?

How to throw an exception in akka stream?

我想抛出如下异常:

  Source.empty
      .map {
        throw new RuntimeException("Failed")
      }
      .runWith(Sink.foreach(println))
      .onComplete {
        case Success(_) =>
          println()
        case Failure(e) =>
          println(s"Thrown ${e.getMessage}")
      }  

但是onComplete方法中没有出现异常。它打印

Exception in thread "main" java.lang.RuntimeException: Failed
    at com.sweetsoft.App$.main(App.scala:30)
    at com.sweetsoft.App.main(App.scala) 

如何抛出异常,使流停止并出现在末尾?

Akka 内置错误处理:Akka Supervision Strategies

val testSupervisionDecider: Supervision.Decider = {
        case ex: java.lang.RuntimeException =>
          println(s"some run time exception ${ex.getMessage}")
          Supervision.Stop
        case ex: Exception =>
          println("Exception occurred and stopping stream",
            ex)
          Supervision.Stop
      }

并且您可以将监督决策者用作

val list = List.range(1, 100)

  Source(list).map { item =>
    if ((item % 2) == 0) {
      throw new RuntimeException(s"$item")
    } else {
      item
    }
  }.withAttributes(ActorAttributes.supervisionStrategy(testSupervisionDecider))
    .runWith(Sink.foreach(println)).onComplete {
    case Success(_) =>
      println()
    case Failure(e) =>
      println(s"Thrown ${e.getMessage}")
  }