如何停止可运行图
How to stop runnable graph
开始使用 akka 流。我有一张类似于从 here 复制的图表:
val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)
val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
(topHS, bottomHS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
Source.single(1) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHS.in
broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
ClosedShape
})
我可以使用 g.run()
运行 图表
但我怎么能阻止它呢?
在什么情况下我应该这样做(除了不使用 - 商业明智)?
该图包含在一个演员中。如果 Actor 崩溃,底层 actor 的图表会发生什么?它也会终止吗?
有一个 KillSwitch 功能应该适合您。检查另一个 SO 问题的答案:
如documentation所述,从图外完成图的方法是KillSwitch
。您从文档中复制的示例不太适合说明这种方法,因为源只是一个元素,当您 运行 时,流将很快完成。让我们调整图表以更轻松地查看 KillSwitch
的实际效果:
val topSink = Sink.foreach(println)
val bottomSink = Sink.foreach(println)
val sharedDoubler = Flow[Int].map(_ * 2)
val killSwitch = KillSwitches.single[Int]
val g = RunnableGraph.fromGraph(GraphDSL.create(topSink, bottomSink, killSwitch)((_, _, _)) {
implicit builder => (topS, bottomS, switch) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
Source.fromIterator(() => (1 to 1000000).iterator) ~> switch ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topS.in
broadcast.out(1) ~> sharedDoubler ~> bottomS.in
ClosedShape
})
val res = g.run // res is of type (Future[Done], Future[Done], UniqueKillSwitch)
Thread.sleep(1000)
res._3.shutdown()
源现在包含一百万个元素,接收器现在打印广播的元素。在我们调用 shutdown
完成流之前,流 运行s 一秒钟,这不足以搅动所有一百万个元素。
如果您 运行 一个演员内部的流,为 运行 流创建的底层演员(或多个演员)的生命周期是否与 "enclosing" actor 取决于 materializer 的创建方式。阅读 documentation 了解更多信息。 Colin Breck 的以下博客 post 关于使用 actor 和 KillSwitch
来管理流的生命周期也很有帮助:http://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/
开始使用 akka 流。我有一张类似于从 here 复制的图表:
val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)
val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
(topHS, bottomHS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
Source.single(1) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHS.in
broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
ClosedShape
})
我可以使用 g.run()
运行 图表
但我怎么能阻止它呢?
在什么情况下我应该这样做(除了不使用 - 商业明智)?
该图包含在一个演员中。如果 Actor 崩溃,底层 actor 的图表会发生什么?它也会终止吗?
有一个 KillSwitch 功能应该适合您。检查另一个 SO 问题的答案:
如documentation所述,从图外完成图的方法是KillSwitch
。您从文档中复制的示例不太适合说明这种方法,因为源只是一个元素,当您 运行 时,流将很快完成。让我们调整图表以更轻松地查看 KillSwitch
的实际效果:
val topSink = Sink.foreach(println)
val bottomSink = Sink.foreach(println)
val sharedDoubler = Flow[Int].map(_ * 2)
val killSwitch = KillSwitches.single[Int]
val g = RunnableGraph.fromGraph(GraphDSL.create(topSink, bottomSink, killSwitch)((_, _, _)) {
implicit builder => (topS, bottomS, switch) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
Source.fromIterator(() => (1 to 1000000).iterator) ~> switch ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topS.in
broadcast.out(1) ~> sharedDoubler ~> bottomS.in
ClosedShape
})
val res = g.run // res is of type (Future[Done], Future[Done], UniqueKillSwitch)
Thread.sleep(1000)
res._3.shutdown()
源现在包含一百万个元素,接收器现在打印广播的元素。在我们调用 shutdown
完成流之前,流 运行s 一秒钟,这不足以搅动所有一百万个元素。
如果您 运行 一个演员内部的流,为 运行 流创建的底层演员(或多个演员)的生命周期是否与 "enclosing" actor 取决于 materializer 的创建方式。阅读 documentation 了解更多信息。 Colin Breck 的以下博客 post 关于使用 actor 和 KillSwitch
来管理流的生命周期也很有帮助:http://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/