Akka Streams 覆盖 KillSwitch
Akka Streams override KillSwitch
我有以下图表:
case class FlowFactory() {
val reactiveConnection = ???
val serviceRabbitConnection = ???
val switch = KillSwitches.single[Routed]
val stream: RunnableGraph[UniqueKillSwitch] = RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw =>
import GraphDSL.Implicits._
val in = builder.add(Source.fromPublisher(reactiveConnection.consume(???)))
val context = builder.add(contextFlow(serviceRabbitConnection))
val inflate = builder.add(inflateFlow())
val compute = builder.add(computeFlow())
val out = builder.add(Sink.fromSubscriber(reactiveConnection.publish()))
in ~> context ~> inflate ~> compute ~> sw ~> out
ClosedShape
})
val killSwitch = stream.run()
killSwitch.shutdown()
}
当我关闭流时,我还需要终止以下连接:reactiveConnection
和 serviceRabbitConnection
。
我该如何实现,是否有一种简单的方法来覆盖 KillSwitch
的 shutdown()
方法?
是否有在流关闭时调用的方法?例如 onComplete()
或 onClose()
?
您可以通过附加附加接收器 (Sink.onComplete
) 在流 中执行回调。
val sink1 = Sink.fromSubscriber(reactiveConnection.publish())
val sink2 = Sink.onComplete{
case Success(_) ⇒ println("success!")
case Failure(e) ⇒ println(s"failure - $e")
}
val out = builder.add(Sink.combine(sink1, sink2)(Broadcast(_)))
我有以下图表:
case class FlowFactory() {
val reactiveConnection = ???
val serviceRabbitConnection = ???
val switch = KillSwitches.single[Routed]
val stream: RunnableGraph[UniqueKillSwitch] = RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw =>
import GraphDSL.Implicits._
val in = builder.add(Source.fromPublisher(reactiveConnection.consume(???)))
val context = builder.add(contextFlow(serviceRabbitConnection))
val inflate = builder.add(inflateFlow())
val compute = builder.add(computeFlow())
val out = builder.add(Sink.fromSubscriber(reactiveConnection.publish()))
in ~> context ~> inflate ~> compute ~> sw ~> out
ClosedShape
})
val killSwitch = stream.run()
killSwitch.shutdown()
}
当我关闭流时,我还需要终止以下连接:reactiveConnection
和 serviceRabbitConnection
。
我该如何实现,是否有一种简单的方法来覆盖 KillSwitch
的 shutdown()
方法?
是否有在流关闭时调用的方法?例如 onComplete()
或 onClose()
?
您可以通过附加附加接收器 (Sink.onComplete
) 在流 中执行回调。
val sink1 = Sink.fromSubscriber(reactiveConnection.publish())
val sink2 = Sink.onComplete{
case Success(_) ⇒ println("success!")
case Failure(e) ⇒ println(s"failure - $e")
}
val out = builder.add(Sink.combine(sink1, sink2)(Broadcast(_)))