使用调度程序按数据中定义的时间延迟 rxjs 5 可观察发射

Delaying rxjs 5 Observable emission by time defined in data with scheduler

我想按照数据中定义的延迟延迟数据流:

Rx.Observable.from([
  {message: "one", delay: 100}, 
  {message: "two", delay: 500}, 
  {message: "three", delay: 10500}
]).subscribe((e) => console.log(e.message))
  1. 它应该在 100 毫秒后记录 "one",在 500 毫秒后记录 "two"。
  2. 我希望能够在消息“三”发出之前取消计时器。

我将如何定义一个 rxjs 5 调度器来完成它?

根据如何你想延误,你可以选择几条路线:

延迟每次发射之间的给定时间

使用 .concatMap 每次发射都必须等待延迟的上一次发射在下一次之前完成:

Rx.Observable.from([
  {message: "one", delay: 500}, 
  {message: "two", delay: 500}, 
  {message: "three", delay: 500}
])
.concatMap(val => Rx.Observable.of(val.message).delay(val.delay))
.subscribe(val => console.log(val))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

每次发射都在前一次发射完成后到达。

延迟应与流的开始有关:

使用 .mergeMap 所有排放将在它们到达时开始,它们的延迟将在那个时间开始。

Rx.Observable.from([
  {message: "one", delay: 500}, 
  {message: "two", delay: 500}, 
  {message: "three", delay: 500}
])
.mergeMap(val => Rx.Observable.of(val.message).delay(val.delay))
.subscribe(val => console.log(val))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

请注意,所有三个排放同时到达。

通过使用标准 Rx 运算符,您的取消支持是内置的。只需取消订阅流,所有未决的未来排放都将被忽略。