ReactiveX:异步行为是如何实现的

ReactiveX: How is Asynchronous Behavior Achieved

RxJS Observables 是可以 return 零到无限值的函数,随着时间的推移,同步或异步。

但是异步行为究竟是如何实现的呢?它是以某种方式使用单独的线程,还是使用 Web API 和 JS 事件循环?

异步行为源于运行时环境的 API。

例如,implementation of fromEvent calls addEventListener and the implementation of AjaxObservable 创建一个 XMLHttpRequest

RxJS 可以使用调度程序将异步行为引入同步可观察对象。例如,使用 of 创建的可观察对象同步发出:

Rx.Observable
  .of(1, 2, 3)
  .subscribe(value => console.log("emitted", value));
console.log("subscribed");
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

但是,如果指定了调度程序,将使用调度程序发出值(通常是异步的):

Rx.Observable
  .of(1, 2, 3, Rx.Scheduler.asap)
  .subscribe(value => console.log("emitted", value));
console.log("subscribed");
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

RxJS 中有几个调度器,它们都是使用异步浏览器 API 实现的:

当然,RxJS 也可以与 Node.js 一起使用,并且在那种环境下,会使用 Node 的 API。