如何停止可运行图

How to stop runnable graph

开始使用 akka 流。我有一张类似于从 here 复制的图表:

val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)    
val g =    RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
      (topHS, bottomHS) =>
      import GraphDSL.Implicits._
      val broadcast = builder.add(Broadcast[Int](2))
      Source.single(1) ~> broadcast.in       

  broadcast.out(0) ~> sharedDoubler ~> topHS.in
  broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
  ClosedShape
})

我可以使用 g.run() 运行 图表 但我怎么能阻止它呢? 在什么情况下我应该这样做(除了不使用 - 商业明智)? 该图包含在一个演员中。如果 Actor 崩溃,底层 actor 的图表会发生什么?它也会终止吗?

有一个 KillSwitch 功能应该适合您。检查另一个 SO 问题的答案:

documentation所述,从图外完成图的方法是KillSwitch。您从文档中复制的示例不太适合说明这种方法,因为源只是一个元素,当您 运行 时,流将很快完成。让我们调整图表以更轻松地查看 KillSwitch 的实际效果:

val topSink = Sink.foreach(println)
val bottomSink = Sink.foreach(println)
val sharedDoubler = Flow[Int].map(_ * 2)
val killSwitch = KillSwitches.single[Int]

val g = RunnableGraph.fromGraph(GraphDSL.create(topSink, bottomSink, killSwitch)((_, _, _)) {
  implicit builder => (topS, bottomS, switch) =>

  import GraphDSL.Implicits._

  val broadcast = builder.add(Broadcast[Int](2))
  Source.fromIterator(() => (1 to 1000000).iterator) ~> switch ~> broadcast.in

  broadcast.out(0) ~> sharedDoubler ~> topS.in
  broadcast.out(1) ~> sharedDoubler ~> bottomS.in
  ClosedShape
})

val res = g.run // res is of type (Future[Done], Future[Done], UniqueKillSwitch)
Thread.sleep(1000)
res._3.shutdown()

源现在包含一百万个元素,接收器现在打印广播的元素。在我们调用 shutdown 完成流之前,流 运行s 一秒钟,这不足以搅动所有一百万个元素。

如果您 运行 一个演员内部的流,为 运行 流创建的底层演员(或多个演员)的生命周期是否与 "enclosing" actor 取决于 materializer 的创建方式。阅读 documentation 了解更多信息。 Colin Breck 的以下博客 post 关于使用 actor 和 KillSwitch 来管理流的生命周期也很有帮助:http://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/