在 HTTP 超时时保持订阅 RXJS 主题
Remain subscribed to an RXJS subject on HTTP timeout
我有一个 RXJS 主题,当我向它发送消息时(使用 next),它会进行 HTTP 调用并在 5 秒后超时。我已经关闭了后端,所以它总是超时。超时导致调用订阅错误函数。完美。
但是,当我第二次向对象发射时,我看到对象有 0 个观察者。超时错误意外地移除了 RXJS 主题的所有当前观察者。但是,我不想要这种行为。我希望所有观察员都保持订阅状态。
我该如何解决这个问题?
重要的代码行是...
console.log(this.getDocumentsSubject.observers.length);
第一次调用时 returns 1。
但是有问题 returns 第二次调用时为 0,超时后。
完整代码如下。
// RXJS SUBJECT AND ASSOCIATED OBSERVABLE
private getDocumentsSubject = new Subject<ElasticFilteredQuery>();
public getDocuments$ = this.getDocumentsSubject.asObservable().flatMap((elasticFilteredQuery: ElasticFilteredQuery) => {
let query = elasticFilteredQuery.toString();
// noinspection UnnecessaryLocalVariableJS
let restStream = this.http.post(BASE_URL + '_search', query, this.options)
.do(() => {
console.log('firing post');
})
.timeout(Config.http.timeout, new Error('timeout'))
.map((response: any) => {
return {
documents: response.json().hits.hits.map((hit: any) => {
return this.createDocumentResult(hit);
}),
numDocuments: response.json().hits.total,
elasticFilteredQuery
};
});
return restStream;
}).publish().refCount();
// EMIT TO RXJS SUBJECT - this is being called at the correct times
public getDocuments(elasticFilteredQuery: ElasticFilteredQuery) {
this.getDocumentsSubject.next(elasticFilteredQuery);
console.log('watch number of observables', this.getDocumentsSubject.observers.length); // Outputs 1 initially but 0 after a timeout
}
// SUBSCRIPTION
this.esQueryService.getDocuments$.subscribe((response: any) => {
console.log('XXXXX NEXT');
...
}, (err: any) => {
console.log('XXXXX error');
...
}, () => {
console.log('XXXXX completed');
}
);
这个答案完全基于这样的假设,即您想将 getDocuments$
用作永久流,每当有新查询 出现时,它都会发出新数据 . (如果不是这种情况,那么答案可能对您没有帮助)
然而,这不会像这样工作,因为每当在流上发出错误时,Stream 本质上是 dead。 (另见 回答)
这是您的 rxjs 架构中的一个基本问题:应该在一次性进程(如 rest-call)上抛出错误,但是数据流(如 documents$
)通常是那里确保任何最终错误都已得到处理,并且永久流中发出的任何内容 (next'ed
) 都是可靠且有效的数据。
所以我的建议是使用 .catch()
优雅地处理错误,并简单地跳过此调用的文档的发送。
有点跑题,可能不相关:
在任何情况下,休息呼叫硬超时都是非常不寻常的情况,如果您想节省服务器电源,那么我建议在服务器端处理此问题。另一个非常常见的情况是,您可能只想在触发下一个查询之前接受响应,以防止 较旧、较慢的 查询在呈现新查询后显示,如果是这种情况,那么您可以使用简单的 .takeUntil(this.getDocumentSubject)
:
this.http.post(BASE_URL + '_search', query, this.options)
.takeUntil(this.getDocumentSubject)
.do(...
作为替代方案,您可以使用 switchMap
而不是 flatMap
您所描述的是 RxJS(以及所有响应式扩展)的设计行为。这不是问题也不是错误。这就是它应该如何工作的。任何错误或完整信号都会进行递归 unsubscribe()
调用。你绝对不是第一个在 SO 上问这个问题的人。
查看类似内容:
当使用 Subjects 时,这可能会出现意外行为,因为 Subject 将自己标记为 "stopped"。见 https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L56.
使用 catch()
运算符有一个重要的 "catch"。此运算符允许您 重新订阅 到同一个 Observable,这可能是您不想要的,因为它可能会触发更多 HTTP 请求,然后再次失败并形成无限循环。请注意,此运算符不会忽略错误。
我有一个 RXJS 主题,当我向它发送消息时(使用 next),它会进行 HTTP 调用并在 5 秒后超时。我已经关闭了后端,所以它总是超时。超时导致调用订阅错误函数。完美。
但是,当我第二次向对象发射时,我看到对象有 0 个观察者。超时错误意外地移除了 RXJS 主题的所有当前观察者。但是,我不想要这种行为。我希望所有观察员都保持订阅状态。
我该如何解决这个问题?
重要的代码行是...
console.log(this.getDocumentsSubject.observers.length);
第一次调用时 returns 1。
但是有问题 returns 第二次调用时为 0,超时后。
完整代码如下。
// RXJS SUBJECT AND ASSOCIATED OBSERVABLE
private getDocumentsSubject = new Subject<ElasticFilteredQuery>();
public getDocuments$ = this.getDocumentsSubject.asObservable().flatMap((elasticFilteredQuery: ElasticFilteredQuery) => {
let query = elasticFilteredQuery.toString();
// noinspection UnnecessaryLocalVariableJS
let restStream = this.http.post(BASE_URL + '_search', query, this.options)
.do(() => {
console.log('firing post');
})
.timeout(Config.http.timeout, new Error('timeout'))
.map((response: any) => {
return {
documents: response.json().hits.hits.map((hit: any) => {
return this.createDocumentResult(hit);
}),
numDocuments: response.json().hits.total,
elasticFilteredQuery
};
});
return restStream;
}).publish().refCount();
// EMIT TO RXJS SUBJECT - this is being called at the correct times
public getDocuments(elasticFilteredQuery: ElasticFilteredQuery) {
this.getDocumentsSubject.next(elasticFilteredQuery);
console.log('watch number of observables', this.getDocumentsSubject.observers.length); // Outputs 1 initially but 0 after a timeout
}
// SUBSCRIPTION
this.esQueryService.getDocuments$.subscribe((response: any) => {
console.log('XXXXX NEXT');
...
}, (err: any) => {
console.log('XXXXX error');
...
}, () => {
console.log('XXXXX completed');
}
);
这个答案完全基于这样的假设,即您想将 getDocuments$
用作永久流,每当有新查询 出现时,它都会发出新数据 . (如果不是这种情况,那么答案可能对您没有帮助)
然而,这不会像这样工作,因为每当在流上发出错误时,Stream 本质上是 dead。 (另见
这是您的 rxjs 架构中的一个基本问题:应该在一次性进程(如 rest-call)上抛出错误,但是数据流(如 documents$
)通常是那里确保任何最终错误都已得到处理,并且永久流中发出的任何内容 (next'ed
) 都是可靠且有效的数据。
所以我的建议是使用 .catch()
优雅地处理错误,并简单地跳过此调用的文档的发送。
有点跑题,可能不相关:
在任何情况下,休息呼叫硬超时都是非常不寻常的情况,如果您想节省服务器电源,那么我建议在服务器端处理此问题。另一个非常常见的情况是,您可能只想在触发下一个查询之前接受响应,以防止 较旧、较慢的 查询在呈现新查询后显示,如果是这种情况,那么您可以使用简单的 .takeUntil(this.getDocumentSubject)
:
this.http.post(BASE_URL + '_search', query, this.options)
.takeUntil(this.getDocumentSubject)
.do(...
作为替代方案,您可以使用 switchMap
而不是 flatMap
您所描述的是 RxJS(以及所有响应式扩展)的设计行为。这不是问题也不是错误。这就是它应该如何工作的。任何错误或完整信号都会进行递归 unsubscribe()
调用。你绝对不是第一个在 SO 上问这个问题的人。
查看类似内容:
当使用 Subjects 时,这可能会出现意外行为,因为 Subject 将自己标记为 "stopped"。见 https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L56.
使用 catch()
运算符有一个重要的 "catch"。此运算符允许您 重新订阅 到同一个 Observable,这可能是您不想要的,因为它可能会触发更多 HTTP 请求,然后再次失败并形成无限循环。请注意,此运算符不会忽略错误。