如何通过 `.publishReplay()` 在带有缓存的 RxJS Observable 上设置超时?

How to set a timeout on a RxJS Observable with a cache throught `.publishReplay()`?

我创建了一个 Observable 来缓存它们的结果一段时间。这个例子很棒而且非常有用!!但是我无法为项目生产者设置超时。我试图在 mockDataFetch() 中使用超时运算符,但在第一个失败的项目之后,流无法恢复。 如何实现 mockDataFetch 超时?

这正是我所做的:

const Observable = Rx.Observable;

var counter = 1;
var updateRequest = Observable.defer(() => mockDataFetch())
    .publishReplay(1, 1000)
    .refCount();

function mockDataFetch() {
    return Observable.of(counter++)
        .delay(Math.floor((Math.random() * 100) + 1))
        .timeout(50);
}

function mockHttpCache() {
    return updateRequest
        .take(1);
}

另一方面,如果在 mockDataFetch 中得到一个 Excpetion 会怎样?我希望在下一个项目上(1000 毫秒后,如 publishReplay 方法中定义的那样),可观察对象会发出一个新项目。

我想我应该更新示例并添加这个 use-case 因为这是很常见的情况(无论如何,我很高兴你发现它有用!)。

当从 mockDataFetch() 返回的 Observable 发送 error/complete 通知时,内部的 Subject 将自己标记为已停止(参见解释 ),因此它不会重新发送任何项目。理想情况下,您可以使用 mockDataFetch():

中的 catch() 运算符捕获所有错误
function mockDataFetch() {
    return Observable.of(counter++)
        .delay(Math.floor((Math.random() * 100) + 1))
        .timeout(50)
        .catch(err => Observable.of('This request is broken.'));
}

观看现场演示:https://jsbin.com/jiguti/5/edit?js,console

此输出可能如下所示:

Response 0: This request is broken.
Response 50: This request is broken.
Response 200: This request is broken.
Response 1200: 2
Response 1500: 2
Response 3500: This request is broken.