理解 monix 中的观察者

Understanding observer in monix

我正在阅读 Monix documentation 有关观察者的广告,我遇到了以下示例:

Or you can quickly build an Observer that only logs the events that it receives. We’ll use this in other samples:

import monix.reactive.Observer

val out = Observer.dump("O")
// out: Observer.Sync[Any]

out.onNext(1)
//=> 0: O-->1
// res0: Ack = Continue

out.onNext(2)
//=> 1: O-->2
// res0: Ack = Continue

out.onComplete()
//=> 2: O completed

然而下一个非法示例:

Feeding two elements, then stopping. This is NOT legal:

// BAD SAMPLE
observer.onNext(1)
observer.onNext(2)
observer.onComplete()

所以我们可以看到相同的 onNext -> onNext -> onComplete 链。不合法吗?为什么?

在您链接的文档中,在示例之后直接进行了解释

这是合法的做法:

observer.onNext(1).map {
  case Continue =>
    // We have permission to continue
    observer.onNext(2)
    // No back-pressure required here
    observer.onComplete()
    Stop
  case Stop =>
    // Nothing else to do
    Stop
}

正如您在评论中看到的那样,问题是 背压。那么为什么有一个例子,使用.dump似乎是非法的?

注意该示例中的注释:

//=> 0: O-->1
// res0: Ack = Continue

这些评论显示了如果你在 Scala REPL 中 运行 你会得到什么。当你输入一个表达式并点击 return 时,REPL 会打印类似 res0 的内容,让你知道最后一个命令的 return 值是什么。

所以这个例子是在演示:

  • 从 REPL 喂养观察者
  • 每个 .onNext 已完成 Continue

以这种方式编写一个向观察者提供数据的程序是不正确的,但这是 正确 t运行 [=36] 的描述=]合法执行喂食观察者

你可以在Contract section下面看到背压相关的规则:

  1. Back-pressure: each onNext call MUST wait on a Continue result returned by the Future[Ack] of the previous onNext call.
  2. Back-pressure for onComplete and onError is optional: when calling onComplete or onError you are not required to wait on the Future[Ack] of the previous onNext.

这是一个很好的重点,因为优雅的背压管理是反应流的一大承诺。