理解 monix 中的观察者
Understanding observer in monix
我正在阅读 Monix documentation 有关观察者的广告,我遇到了以下示例:
Or you can quickly build an Observer that only logs the events that it
receives. We’ll use this in other samples:
import monix.reactive.Observer
val out = Observer.dump("O")
// out: Observer.Sync[Any]
out.onNext(1)
//=> 0: O-->1
// res0: Ack = Continue
out.onNext(2)
//=> 1: O-->2
// res0: Ack = Continue
out.onComplete()
//=> 2: O completed
然而下一个非法示例:
Feeding two elements, then stopping. This is NOT legal:
// BAD SAMPLE
observer.onNext(1)
observer.onNext(2)
observer.onComplete()
所以我们可以看到相同的 onNext -> onNext -> onComplete
链。不合法吗?为什么?
在您链接的文档中,在示例之后直接进行了解释
这是合法的做法:
observer.onNext(1).map {
case Continue =>
// We have permission to continue
observer.onNext(2)
// No back-pressure required here
observer.onComplete()
Stop
case Stop =>
// Nothing else to do
Stop
}
正如您在评论中看到的那样,问题是 背压。那么为什么有一个例子,使用.dump
似乎是非法的?
注意该示例中的注释:
//=> 0: O-->1
// res0: Ack = Continue
这些评论显示了如果你在 Scala REPL 中 运行 你会得到什么。当你输入一个表达式并点击 return 时,REPL 会打印类似 res0
的内容,让你知道最后一个命令的 return 值是什么。
所以这个例子是在演示:
- 从 REPL 喂养观察者
- 每个
.onNext
已完成 Continue
以这种方式编写一个向观察者提供数据的程序是不正确的,但这是 正确 t运行 [=36] 的描述=]合法执行喂食观察者
你可以在Contract section下面看到背压相关的规则:
- Back-pressure: each onNext call MUST wait on a Continue result returned by the Future[Ack] of the previous onNext call.
- Back-pressure for onComplete and onError is optional: when calling onComplete or onError you are not required to wait on the Future[Ack] of the previous onNext.
这是一个很好的重点,因为优雅的背压管理是反应流的一大承诺。
我正在阅读 Monix documentation 有关观察者的广告,我遇到了以下示例:
Or you can quickly build an Observer that only logs the events that it receives. We’ll use this in other samples:
import monix.reactive.Observer val out = Observer.dump("O") // out: Observer.Sync[Any] out.onNext(1) //=> 0: O-->1 // res0: Ack = Continue out.onNext(2) //=> 1: O-->2 // res0: Ack = Continue out.onComplete() //=> 2: O completed
然而下一个非法示例:
Feeding two elements, then stopping. This is NOT legal:
// BAD SAMPLE observer.onNext(1) observer.onNext(2) observer.onComplete()
所以我们可以看到相同的 onNext -> onNext -> onComplete
链。不合法吗?为什么?
在您链接的文档中,在示例之后直接进行了解释
这是合法的做法:
observer.onNext(1).map {
case Continue =>
// We have permission to continue
observer.onNext(2)
// No back-pressure required here
observer.onComplete()
Stop
case Stop =>
// Nothing else to do
Stop
}
正如您在评论中看到的那样,问题是 背压。那么为什么有一个例子,使用.dump
似乎是非法的?
注意该示例中的注释:
//=> 0: O-->1
// res0: Ack = Continue
这些评论显示了如果你在 Scala REPL 中 运行 你会得到什么。当你输入一个表达式并点击 return 时,REPL 会打印类似 res0
的内容,让你知道最后一个命令的 return 值是什么。
所以这个例子是在演示:
- 从 REPL 喂养观察者
- 每个
.onNext
已完成Continue
以这种方式编写一个向观察者提供数据的程序是不正确的,但这是 正确 t运行 [=36] 的描述=]合法执行喂食观察者
你可以在Contract section下面看到背压相关的规则:
- Back-pressure: each onNext call MUST wait on a Continue result returned by the Future[Ack] of the previous onNext call.
- Back-pressure for onComplete and onError is optional: when calling onComplete or onError you are not required to wait on the Future[Ack] of the previous onNext.
这是一个很好的重点,因为优雅的背压管理是反应流的一大承诺。