在 HTTP 超时时保持订阅 RXJS 主题

Remain subscribed to an RXJS subject on HTTP timeout

我有一个 RXJS 主题,当我向它发送消息时(使用 next),它会进行 HTTP 调用并在 5 秒后超时。我已经关闭了后端,所以它总是超时。超时导致调用订阅错误函数。完美。

但是,当我第二次向对象发射时,我看到对象有 0 个观察者。超时错误意外地移除了 RXJS 主题的所有当前观察者。但是,我不想要这种行为。我希望所有观察员都保持订阅状态。

我该如何解决这个问题?

重要的代码行是...

console.log(this.getDocumentsSubject.observers.length);

第一次调用时 returns 1。

但是有问题 returns 第二次调用时为 0,超时后。

完整代码如下。

// RXJS SUBJECT AND ASSOCIATED OBSERVABLE

private getDocumentsSubject = new Subject<ElasticFilteredQuery>();
public getDocuments$ = this.getDocumentsSubject.asObservable().flatMap((elasticFilteredQuery: ElasticFilteredQuery) => {

let query = elasticFilteredQuery.toString();

// noinspection UnnecessaryLocalVariableJS
let restStream = this.http.post(BASE_URL + '_search', query, this.options)
  .do(() => {
    console.log('firing post');
  })
  .timeout(Config.http.timeout, new Error('timeout'))
  .map((response: any) => {

    return {
      documents: response.json().hits.hits.map((hit: any) => {
        return this.createDocumentResult(hit);
      }),
      numDocuments: response.json().hits.total,
      elasticFilteredQuery
    };
  });

return restStream;
}).publish().refCount();



// EMIT TO RXJS SUBJECT - this is being called at the correct times

public getDocuments(elasticFilteredQuery: ElasticFilteredQuery) {
  this.getDocumentsSubject.next(elasticFilteredQuery);
  console.log('watch number of observables', this.getDocumentsSubject.observers.length); // Outputs 1 initially but 0 after a timeout
}


// SUBSCRIPTION

this.esQueryService.getDocuments$.subscribe((response: any) => {
    console.log('XXXXX NEXT');
    ...
  }, (err: any) => {
    console.log('XXXXX error');
    ...
  }, () => {
    console.log('XXXXX completed');
  }
);

这个答案完全基于这样的假设,即您想将 getDocuments$ 用作永久流,每当有新查询 出现时,它都会发出新数据 . (如果不是这种情况,那么答案可能对您没有帮助)

然而,这不会像这样工作,因为每当在流上发出错误时,Stream 本质上是 dead。 (另见 回答)

这是您的 rxjs 架构中的一个基本问题:应该在一次性进程(如 rest-call)上抛出错误,但是数据流(如 documents$)通常是那里确保任何最终错误都已得到处理,并且永久流中发出的任何内容 (next'ed) 都是可靠且有效的数据。

所以我的建议是使用 .catch() 优雅地处理错误,并简单地跳过此调用的文档的发送。


有点跑题,可能不相关:

在任何情况下,休息呼叫硬超时都是非常不寻常的情况,如果您想节省服务器电源,那么我建议在服务器端处理此问题。另一个非常常见的情况是,您可能只想在触发下一个查询之前接受响应,以防止 较旧、较慢的 查询在呈现新查询后显示,如果是这种情况,那么您可以使用简单的 .takeUntil(this.getDocumentSubject):

this.http.post(BASE_URL + '_search', query, this.options)
  .takeUntil(this.getDocumentSubject)
  .do(...

作为替代方案,您可以使用 switchMap 而不是 flatMap

您所描述的是 RxJS(以及所有响应式扩展)的设计行为。这不是问题也不是错误。这就是它应该如何工作的。任何错误或完整信号都会进行递归 unsubscribe() 调用。你绝对不是第一个在 SO 上问这个问题的人。

查看类似内容:

使用 catch() 运算符有一个重要的 "catch"。此运算符允许您 重新订阅 到同一个 Observable,这可能是您不想要的,因为它可能会触发更多 HTTP 请求,然后再次失败并形成无限循环。请注意,此运算符不会忽略错误。