Akka Streams 中的可观察延迟
Observable defer in Akka Streams
我来自 ReactiveX,我们有运算符 defer,以便创建一个 Observable
并在我们拥有订阅者后获得发射值。
在 Akka Streams 中,我想知道是否存在类似的东西:
@Test def defer(): Unit = {
var range = 0 to 10
val graphs = Source(range)
.to(Sink.foreach(println))
range = 10 to 20
graphs.run()
Thread.sleep(2000)
}
有了这段代码,甚至在我们执行 运行() 之前,更改范围的值,因为已经创建了蓝图,所以值没有改变,并发出 0 到 10。
Akka Streams 中是否有类似 Observable.defer
的内容?
解决方案:
我找到了解决方案,解决方案是使用 lazy 关键字,其中我们提供了一个函数,一旦我们 运行 流就会执行。
我会保留这个问题,以防万一有更好的方法或其他人有同样的问题
@Test def defer(): Unit = {
var range = 0 to 10
val graphs = Source.lazily(() => Source(range))
.to(Sink.foreach(println))
range = 10 to 20
graphs.run()
Thread.sleep(2000)
}
此致。
最简单的方法可能是 Source.fromIterator(() => List(1).iterator)
或类似的方法。在 Akka Streams API 中,我们选择尝试保留最少的运算符集,因此有时您可能会遇到这样的情况,在单行代码中可以实现相同的效果,但不会有名称类似的直接对应项在延迟的情况下。如果您认为这是一件很常见的事情,请在 github.com/akka/akka 上告诉我们,我们可以考虑将其添加为 API.
请注意,还有 fromFuture
和其他一些,虽然没有直接关系,但根据您的实际用例(尤其是与 Promise 等结合使用时)可能会有用。
我来自 ReactiveX,我们有运算符 defer,以便创建一个 Observable
并在我们拥有订阅者后获得发射值。
在 Akka Streams 中,我想知道是否存在类似的东西:
@Test def defer(): Unit = {
var range = 0 to 10
val graphs = Source(range)
.to(Sink.foreach(println))
range = 10 to 20
graphs.run()
Thread.sleep(2000)
}
有了这段代码,甚至在我们执行 运行() 之前,更改范围的值,因为已经创建了蓝图,所以值没有改变,并发出 0 到 10。
Akka Streams 中是否有类似 Observable.defer
的内容?
解决方案:
我找到了解决方案,解决方案是使用 lazy 关键字,其中我们提供了一个函数,一旦我们 运行 流就会执行。
我会保留这个问题,以防万一有更好的方法或其他人有同样的问题
@Test def defer(): Unit = {
var range = 0 to 10
val graphs = Source.lazily(() => Source(range))
.to(Sink.foreach(println))
range = 10 to 20
graphs.run()
Thread.sleep(2000)
}
此致。
最简单的方法可能是 Source.fromIterator(() => List(1).iterator)
或类似的方法。在 Akka Streams API 中,我们选择尝试保留最少的运算符集,因此有时您可能会遇到这样的情况,在单行代码中可以实现相同的效果,但不会有名称类似的直接对应项在延迟的情况下。如果您认为这是一件很常见的事情,请在 github.com/akka/akka 上告诉我们,我们可以考虑将其添加为 API.
请注意,还有 fromFuture
和其他一些,虽然没有直接关系,但根据您的实际用例(尤其是与 Promise 等结合使用时)可能会有用。