如何从外部正确停止 Akka 流
How to properly stop Akka streams from the outside
我正在设计一个可以生成 CSV 测试数据的小工具。我想使用 Akka Streams (1.0-RC4) 来实现数据流。将有一个生成随机数的源、一个转换为 CSV 字符串、一些速率限制器和一个写入文件的接收器。
还应该有一种使用小型 REST 接口停止该工具的简洁方法。
这就是我挣扎的地方。流开始后 (RunnableFlow.run()) 似乎无法停止。 Source 和 Sink 是无限的(至少直到磁盘满了:))所以他们不会停止流。
给Source或者Sink添加控制逻辑感觉不对。也使用 ActorSystem.shutdown() 。 好的停止流的方法是什么?
好的,所以我找到了一个不错的解决方案。它已经在我眼皮底下了,只是我没有看到它。 Source.lazyEmpty
实现一个承诺,即完成后将终止 Source 及其背后的流。
剩下的问题是,如何将它包含到无穷无尽的随机数流中。我试过了Zip
。结果是没有随机数通过流,因为 lazyEmpty
从不发出值 (doh)。我尝试了 Merge
但流从未终止,因为 Merge
一直持续到 所有 来源完成。
所以我写了自己的合并。它转发来自输入端口之一的所有值,并在 any 源完成时终止。
object StopperFlow {
private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
val in = newInlet[A]("in")
val stopper = newInlet[Unit]("stopper")
override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
}
private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
new StopperMergeShape(), Attributes.name("StopperMerge")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[In] {
override def initialState =
State[In](Read(p.in)) { (ctx, input, element) =>
ctx.emit(element)
SameState
}
override def initialCompletionHandling = eagerClose
}
}
def apply[In](): Flow[In, In, Promise[Unit]] = {
val stopperSource = Source.lazyEmpty[Unit]
Flow(stopperSource) { implicit builder =>
stopper =>
val stopperMerge = builder.add(new StopperMerge[In]())
stopper ~> stopperMerge.stopper
(stopperMerge.in, stopperMerge.out)
}
}
}
流可以插入任何流。实现后,它将 return 一个 Promise
在完成时终止流。这是我的测试。
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val startTime = System.currentTimeMillis()
def dumpToConsole(f: Float) = {
val timeSinceStart = System.currentTimeMillis() - startTime
System.out.println(s"[$timeSinceStart] - Random number: $f")
}
val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)
val (_, promise) = flow.run()
Thread.sleep(1000)
val _ = promise.success(())
Thread.sleep(1000)
我希望这对其他人也有用。仍然让我感到困惑,为什么没有内置的方式从流外部终止流。
不完全是停止,而是限制。您可以使用 limit
或 take
.
示例来自 Streams Cookbook:
val MAX_ALLOWED_SIZE = 100
// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
val limited: Future[Seq[String]] =
mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq)
// OK. Collect up until max-th elements only, then cancel upstream
val ignoreOverflow: Future[Seq[String]] =
mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq)
您可以使用 Akka KillSwitches 中止(失败)或关闭流。
有两种类型的终止开关:
- UniqueKillSwitch 仅针对单个流。
- SharedKillSwitch 可以一次关闭多个流。
链接中提供了代码示例,但这里是使用共享终止开关中止多个流的示例:
val countingSrc = Source(Stream.from(1)).delay(1.second)
val lastSnk = Sink.last[Int]
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val error = new RuntimeException("boom!")
sharedKillSwitch.abort(error)
Await.result(last1.failed, 1.second) shouldBe error
Await.result(last2.failed, 1.second) shouldBe error
我正在设计一个可以生成 CSV 测试数据的小工具。我想使用 Akka Streams (1.0-RC4) 来实现数据流。将有一个生成随机数的源、一个转换为 CSV 字符串、一些速率限制器和一个写入文件的接收器。
还应该有一种使用小型 REST 接口停止该工具的简洁方法。
这就是我挣扎的地方。流开始后 (RunnableFlow.run()) 似乎无法停止。 Source 和 Sink 是无限的(至少直到磁盘满了:))所以他们不会停止流。
给Source或者Sink添加控制逻辑感觉不对。也使用 ActorSystem.shutdown() 。 好的停止流的方法是什么?
好的,所以我找到了一个不错的解决方案。它已经在我眼皮底下了,只是我没有看到它。 Source.lazyEmpty
实现一个承诺,即完成后将终止 Source 及其背后的流。
剩下的问题是,如何将它包含到无穷无尽的随机数流中。我试过了Zip
。结果是没有随机数通过流,因为 lazyEmpty
从不发出值 (doh)。我尝试了 Merge
但流从未终止,因为 Merge
一直持续到 所有 来源完成。
所以我写了自己的合并。它转发来自输入端口之一的所有值,并在 any 源完成时终止。
object StopperFlow {
private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
val in = newInlet[A]("in")
val stopper = newInlet[Unit]("stopper")
override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
}
private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
new StopperMergeShape(), Attributes.name("StopperMerge")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[In] {
override def initialState =
State[In](Read(p.in)) { (ctx, input, element) =>
ctx.emit(element)
SameState
}
override def initialCompletionHandling = eagerClose
}
}
def apply[In](): Flow[In, In, Promise[Unit]] = {
val stopperSource = Source.lazyEmpty[Unit]
Flow(stopperSource) { implicit builder =>
stopper =>
val stopperMerge = builder.add(new StopperMerge[In]())
stopper ~> stopperMerge.stopper
(stopperMerge.in, stopperMerge.out)
}
}
}
流可以插入任何流。实现后,它将 return 一个 Promise
在完成时终止流。这是我的测试。
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val startTime = System.currentTimeMillis()
def dumpToConsole(f: Float) = {
val timeSinceStart = System.currentTimeMillis() - startTime
System.out.println(s"[$timeSinceStart] - Random number: $f")
}
val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)
val (_, promise) = flow.run()
Thread.sleep(1000)
val _ = promise.success(())
Thread.sleep(1000)
我希望这对其他人也有用。仍然让我感到困惑,为什么没有内置的方式从流外部终止流。
不完全是停止,而是限制。您可以使用 limit
或 take
.
示例来自 Streams Cookbook:
val MAX_ALLOWED_SIZE = 100
// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
val limited: Future[Seq[String]] =
mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq)
// OK. Collect up until max-th elements only, then cancel upstream
val ignoreOverflow: Future[Seq[String]] =
mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq)
您可以使用 Akka KillSwitches 中止(失败)或关闭流。
有两种类型的终止开关:
- UniqueKillSwitch 仅针对单个流。
- SharedKillSwitch 可以一次关闭多个流。
链接中提供了代码示例,但这里是使用共享终止开关中止多个流的示例:
val countingSrc = Source(Stream.from(1)).delay(1.second)
val lastSnk = Sink.last[Int]
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val error = new RuntimeException("boom!")
sharedKillSwitch.abort(error)
Await.result(last1.failed, 1.second) shouldBe error
Await.result(last2.failed, 1.second) shouldBe error