Akka Streams 覆盖 KillSwitch

Akka Streams override KillSwitch

我有以下图表:

case class FlowFactory() {

  val reactiveConnection = ???
  val serviceRabbitConnection = ???

  val switch = KillSwitches.single[Routed]

  val stream: RunnableGraph[UniqueKillSwitch] = RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw =>
    import GraphDSL.Implicits._

    val in = builder.add(Source.fromPublisher(reactiveConnection.consume(???)))
    val context = builder.add(contextFlow(serviceRabbitConnection))
    val inflate = builder.add(inflateFlow())
    val compute = builder.add(computeFlow())
    val out = builder.add(Sink.fromSubscriber(reactiveConnection.publish()))

    in ~> context ~> inflate ~> compute ~> sw ~> out

    ClosedShape
  })

  val killSwitch = stream.run()

  killSwitch.shutdown()

}

当我关闭流时,我还需要终止以下连接:reactiveConnectionserviceRabbitConnection。 我该如何实现,是否有一种简单的方法来覆盖 KillSwitchshutdown() 方法? 是否有在流关闭时调用的方法?例如 onComplete()onClose()?

您可以通过附加附加接收器 (Sink.onComplete) 在流 中执行回调

  val sink1 = Sink.fromSubscriber(reactiveConnection.publish())
  val sink2 = Sink.onComplete{
    case Success(_) ⇒ println("success!")
    case Failure(e) ⇒ println(s"failure - $e")
  }

  val out = builder.add(Sink.combine(sink1, sink2)(Broadcast(_)))