rxjs 在错误发生后立即完成一个 observable 而不是继续

rxjs completes an observable immediately after error instead of continuing

我正在尝试通过可观察对象流式传输一组内容,但它在出现第一个错误后停止。将其视为项目数组过于简单,因为它们的行为方式相同。

  1. 我正在从项目数组创建一个可观察对象
  2. 将每个项目映射到 URL
  3. 调用 URL 作为请求承诺
  4. 在出现错误时执行 returns observable.empty() 的 catch()

使用 RxJS 5:

rx.Observable.from(array)
  .map(self.createUrl)
  .flatMap(x => {
            var options = {
            uri: url,
            headers: {
                "Content-Type": "application/json"
            };
            return rx.Observable.fromPromise(request-promise(options));
        })
  .catch(() => {
    return rx.Observable.empty();})
  .subscribe( x => console.log('success:', x),
           e => console.log('error'),
           () => console.log('complete'));

执行此序列时,代码在遇到第一个错误后停止。我怀疑 #4 中的空 observable 正在终止 observable 但我不确定为什么。

我想要的过程是处理数组中的所有项目,无论错误如何 - 最终处理所有成功的项目并在每次错误后恢复。

您只需要将 catch() 放在 flatMap 中:

rx.Observable.from(array)
  .map(self.createUrl)
  .flatMap(x => {
    var options = {
      uri: url,
      headers: {
        "Content-Type": "application/json"
      }
    };
    return rx.Observable
      .fromPromise(request-promise(options))
      .catch(() => rx.Observable.empty());
  })
  .subscribe( x => console.log('success:', x),
           e => console.log('error'),
           () => console.log('complete'));

现在当内部 Observable 发出错误时,它会立即被捕获并且不会通过 .flatMap().

传播到主流