用于缓存和延迟数据检索的可观察模式
Observable patterns for caching and defered data retrieval
我正在尝试使用 RxJS Observable 在 angular 中创建一个缓存函数。最初我使用 angularjs $q
的延迟承诺创建了这个方法。 Observables 和 RxJS 对我来说是新的,我发现这种工作方法仍然有些混乱。
这是我目前实现的一个getOrCreate
缓存功能。从存储中检索键的单个值 (this.get()
),如果它不在其中,则在其他地方检索它 (fetcher
)。
假设 fetcher
是比 this.get()
慢的数据源。当我们仍在从 this.get()
中检索时,可能会触发对同一键的多个请求,因此我放入了一个聚合器:只为同一键的多个请求创建一个可观察对象。
protected observableCache : {[key: string] : Observable<any>} = {};
get<T>(key : string): Observable<T> { /* Async data retrieval */ }
getOrCreate<T>(key : string, fetcher: () => Observable<T>) : Observable<T> {
const keyHash = this.hash(key);
// Check if an observable for the same key is already in flight
if (this.observableCache[keyHash]) {
return this.observableCache[keyHash];
} else {
let observable : Observable<T>;
this.get(key).subscribe(
// Cache hit
(result) => { observable = Observable.of(result); },
// Cache miss. Retrieving from fetching while creating entry
() => {
fetcher().subscribe((fetchedResult) => {
if(fetchedResult) {
this.put(key, fetchedResult);
}
observable = Observable.of(fetchedResult);
});
}
);
// Register and unregister in-flight observables
this.observableCache[keyHash] = observable;
observable.subscribe(()=> {
delete this.observableCache[this.hash(key)];
});
return observable;
}
}
这是该代码的当前版本,但看起来我没有正确处理异步代码:
- Observable 将在实例化之前返回:
return observable
在 observable = Observable.of(result)
之前触发;
- 在
this.get()
仍在进行中时,聚合对同一密钥的所有请求可能有更好的模式。
有人可以帮助找到应该使用的观察者模式吗?
我认为这可能有效。重写为:
getOrCreate<T>(key : string, fetcher: () => Observable<T>) : Observable<T> {
const keyHash = this.hash(key);
// Check if an observable for the same key is already in flight
if (this.observableCache[keyHash]) {
return this.observableCache[keyHash];
}
let observable : ConnectableObservable<T> = this.get(key)
.catch(() => { // Catch is for when the source observable throws error: It replaces it with the new Rx.Observable that is returned by it's method
// Cache miss. Retrieving from fetching while creating entry
return this.fetchFromFetcher(key, fetcher);
})
.publish();
// Register and unregister in-flight observables
this.observableCache[keyHash] = observable;
observable.subscribe(()=> {
delete this.observableCache[keyHash];
});
observable.connect();
return observable;
},
fetchFromFetcher(key : string, fetcher: () => Observable<T>) : Observable<T> {
// Here we create a stream that subscribes to fetcher to use `this.put(...)`, returning the original value when done
return Rx.Observable.create(observer => {
fetcher().subscribe(fetchedResult => {
this.put(key, fetchedResult);
observer.next(fetchedResult);
},
err => observer.error(err),
() => observer.complete())
});
}
解释:
- Observables 与 promises 非常不同。他们要处理异步的东西,有一些相似之处,但他们有很大的不同
- 因为
this.get(...)
看起来是异步的,所以你的 let observable
在它产生一个值之前不会被填充,所以当你将它分配给你的缓存时,它是 null 是正常的。
- 可观察对象的一大优点(以及与 promises 的主要区别)是您可以在执行任何操作之前定义一个流。在我的解决方案中,在调用
observable.connect()
之前不会调用任何内容。这避免了这么多 .subscriptions
- 所以,在我的代码中,我得到了
this.get(key)
流,并告诉它如果失败 (.catch(...)
) 它必须获取结果,但是一旦获取结果就将其放入缓存(this.put(key, fetchedResult
)
- 然后我
publish()
这个 observable:这使得它的行为更像 promise,它使它 "hot"。这意味着所有订阅者都将从同一个流中获取值,而不是每次订阅时都创建一个从 0 开始的新流。
- 然后我把它存在observable pool里,设置完了就删除
- 最后,我
.connect()
。这只有在你 publish()
它时才会完成,它是实际订阅原始流的东西,执行你想要的一切。
为了清楚起见,因为这是来自 Promises 的常见错误,在 angular 中,如果您将流定义为:
let myRequest = this.http.get("http://www.example.com/")
.map((result) => result.json());
请求尚未发送。每次你做 myRequest.subscribe()
时,都会向服务器发出新的请求,它不会重用 "first subscription" 结果。这就是 .publish()
非常有用的原因:它使得当您调用 .connect()
时它会创建一个触发请求的订阅,并将与所有传入订阅共享最后收到的结果(Observables 支持流:许多结果)到已发布的可观察对象。
我正在尝试使用 RxJS Observable 在 angular 中创建一个缓存函数。最初我使用 angularjs $q
的延迟承诺创建了这个方法。 Observables 和 RxJS 对我来说是新的,我发现这种工作方法仍然有些混乱。
这是我目前实现的一个getOrCreate
缓存功能。从存储中检索键的单个值 (this.get()
),如果它不在其中,则在其他地方检索它 (fetcher
)。
假设 fetcher
是比 this.get()
慢的数据源。当我们仍在从 this.get()
中检索时,可能会触发对同一键的多个请求,因此我放入了一个聚合器:只为同一键的多个请求创建一个可观察对象。
protected observableCache : {[key: string] : Observable<any>} = {};
get<T>(key : string): Observable<T> { /* Async data retrieval */ }
getOrCreate<T>(key : string, fetcher: () => Observable<T>) : Observable<T> {
const keyHash = this.hash(key);
// Check if an observable for the same key is already in flight
if (this.observableCache[keyHash]) {
return this.observableCache[keyHash];
} else {
let observable : Observable<T>;
this.get(key).subscribe(
// Cache hit
(result) => { observable = Observable.of(result); },
// Cache miss. Retrieving from fetching while creating entry
() => {
fetcher().subscribe((fetchedResult) => {
if(fetchedResult) {
this.put(key, fetchedResult);
}
observable = Observable.of(fetchedResult);
});
}
);
// Register and unregister in-flight observables
this.observableCache[keyHash] = observable;
observable.subscribe(()=> {
delete this.observableCache[this.hash(key)];
});
return observable;
}
}
这是该代码的当前版本,但看起来我没有正确处理异步代码:
- Observable 将在实例化之前返回:
return observable
在observable = Observable.of(result)
之前触发; - 在
this.get()
仍在进行中时,聚合对同一密钥的所有请求可能有更好的模式。
有人可以帮助找到应该使用的观察者模式吗?
我认为这可能有效。重写为:
getOrCreate<T>(key : string, fetcher: () => Observable<T>) : Observable<T> {
const keyHash = this.hash(key);
// Check if an observable for the same key is already in flight
if (this.observableCache[keyHash]) {
return this.observableCache[keyHash];
}
let observable : ConnectableObservable<T> = this.get(key)
.catch(() => { // Catch is for when the source observable throws error: It replaces it with the new Rx.Observable that is returned by it's method
// Cache miss. Retrieving from fetching while creating entry
return this.fetchFromFetcher(key, fetcher);
})
.publish();
// Register and unregister in-flight observables
this.observableCache[keyHash] = observable;
observable.subscribe(()=> {
delete this.observableCache[keyHash];
});
observable.connect();
return observable;
},
fetchFromFetcher(key : string, fetcher: () => Observable<T>) : Observable<T> {
// Here we create a stream that subscribes to fetcher to use `this.put(...)`, returning the original value when done
return Rx.Observable.create(observer => {
fetcher().subscribe(fetchedResult => {
this.put(key, fetchedResult);
observer.next(fetchedResult);
},
err => observer.error(err),
() => observer.complete())
});
}
解释:
- Observables 与 promises 非常不同。他们要处理异步的东西,有一些相似之处,但他们有很大的不同
- 因为
this.get(...)
看起来是异步的,所以你的let observable
在它产生一个值之前不会被填充,所以当你将它分配给你的缓存时,它是 null 是正常的。 - 可观察对象的一大优点(以及与 promises 的主要区别)是您可以在执行任何操作之前定义一个流。在我的解决方案中,在调用
observable.connect()
之前不会调用任何内容。这避免了这么多 .subscriptions - 所以,在我的代码中,我得到了
this.get(key)
流,并告诉它如果失败 (.catch(...)
) 它必须获取结果,但是一旦获取结果就将其放入缓存(this.put(key, fetchedResult
) - 然后我
publish()
这个 observable:这使得它的行为更像 promise,它使它 "hot"。这意味着所有订阅者都将从同一个流中获取值,而不是每次订阅时都创建一个从 0 开始的新流。 - 然后我把它存在observable pool里,设置完了就删除
- 最后,我
.connect()
。这只有在你publish()
它时才会完成,它是实际订阅原始流的东西,执行你想要的一切。
为了清楚起见,因为这是来自 Promises 的常见错误,在 angular 中,如果您将流定义为:
let myRequest = this.http.get("http://www.example.com/")
.map((result) => result.json());
请求尚未发送。每次你做 myRequest.subscribe()
时,都会向服务器发出新的请求,它不会重用 "first subscription" 结果。这就是 .publish()
非常有用的原因:它使得当您调用 .connect()
时它会创建一个触发请求的订阅,并将与所有传入订阅共享最后收到的结果(Observables 支持流:许多结果)到已发布的可观察对象。