Akka Streams 中的可观察延迟

Observable defer in Akka Streams

我来自 ReactiveX,我们有运算符 defer,以便创建一个 Observable 并在我们拥有订阅者后获得发射值。

在 Akka Streams 中,我想知道是否存在类似的东西:

 @Test def defer(): Unit = {
    var range = 0 to 10
    val graphs = Source(range)
      .to(Sink.foreach(println))
    range = 10 to 20
    graphs.run()
    Thread.sleep(2000)
  }

有了这段代码,甚至在我们执行 运行() 之前,更改范围的值,因为已经创建了蓝图,所以值没有改变,并发出 0 到 10。

Akka Streams 中是否有类似 Observable.defer 的内容?

解决方案:

我找到了解决方案,解决方案是使用 lazy 关键字,其中我们提供了一个函数,一旦我们 运行 流就会执行。

我会保留这个问题,以防万一有更好的方法或其他人有同样的问题

  @Test def defer(): Unit = {
    var range = 0 to 10
    val graphs = Source.lazily(() => Source(range))
      .to(Sink.foreach(println))
    range = 10 to 20
    graphs.run()
    Thread.sleep(2000)
  }

此致。

最简单的方法可能是 Source.fromIterator(() => List(1).iterator) 或类似的方法。在 Akka Streams API 中,我们选择尝试保留最少的运算符集,因此有时您可能会遇到这样的情况,在单行代码中可以实现相同的效果,但不会有名称类似的直接对应项在延迟的情况下。如果您认为这是一件很常见的事情,请在 github.com/akka/akka 上告诉我们,我们可以考虑将其添加为 API.

请注意,还有 fromFuture 和其他一些,虽然没有直接关系,但根据您的实际用例(尤其是与 Promise 等结合使用时)可能会有用。