为进行中的 API 请求重新使用可观察对象
Re-using observables for in-flight API requests
我有一个组件 A,它自己查询 API 并使用 RX 将结果存储在缓存中:
List<Data> cache = ...;
void queryData(int parameter) {
myApi.getData(parameter).subscribe(data => cache.add(data));
}
现在,我想从其他组件查询这个缓存。在缓存未命中时,我希望设计允许重新使用正在进行的订阅(如果存在)。这是为了避免重复相同的请求。
我正在考虑将 Observable
存储在单独的缓存中(仍在我的组件 A 中)。
List<Observable<Data>> requestCache = ...;
public Observable<Data> getData(int parameter) {
// if cache contains data, return new observable that emits that item
// if cache does not contain data, check if requestCache contains the Observable I need. If so, return it.
// if all else fails, return queryData(parameter)
}
我需要将我原来的 queryData
修改为:
Observable<Data> queryData(int parameter) {
Observable<Data> observable = myApi.getData(parameter);
requestCache.add(observable);
observable.subscribe(data => cache.add(data));
observable.subscribe(data => requestCache.remove(observable));
return observable;
}
这将完全隐藏这些请求对其他组件的 caching/in 飞行复杂性。但是,我想知道 RX 中是否有内置的方法来处理这个问题。
我走在正确的轨道上还是有更好的模式?
没有执行此操作的内置方法。以下是我将如何解决这个问题:
public final class ReactiveCache<K, V> {
final Map<K, Observable<V>> requests = new HashMap<>();
final Function<K, Observable<V>> generator;
public ReactiveCache(Function<K, Observable<V>> generator) {
this.generator = generator;
}
public Observable<V> get(K key) {
ConnectableObservable<V> result;
synchronized (requests) {
Observable<V> current = requests.get(key);
if (current != null) {
return current;
}
result = generator.apply(key)
.doOnTerminate(() -> {
synchronized (requests) {
requests.remove(key);
}
})
.replay();
requests.put(key, result);
}
return result.autoConnect(0);
}
}
在这个 class 中,当您调用 get() 并且有一个运行中的可观察对象时,您可以加入它并接收它的所有值。如果有 none 或者飞行中已经完成,你将开始一个新的 Observable。
我有一个组件 A,它自己查询 API 并使用 RX 将结果存储在缓存中:
List<Data> cache = ...;
void queryData(int parameter) {
myApi.getData(parameter).subscribe(data => cache.add(data));
}
现在,我想从其他组件查询这个缓存。在缓存未命中时,我希望设计允许重新使用正在进行的订阅(如果存在)。这是为了避免重复相同的请求。
我正在考虑将 Observable
存储在单独的缓存中(仍在我的组件 A 中)。
List<Observable<Data>> requestCache = ...;
public Observable<Data> getData(int parameter) {
// if cache contains data, return new observable that emits that item
// if cache does not contain data, check if requestCache contains the Observable I need. If so, return it.
// if all else fails, return queryData(parameter)
}
我需要将我原来的 queryData
修改为:
Observable<Data> queryData(int parameter) {
Observable<Data> observable = myApi.getData(parameter);
requestCache.add(observable);
observable.subscribe(data => cache.add(data));
observable.subscribe(data => requestCache.remove(observable));
return observable;
}
这将完全隐藏这些请求对其他组件的 caching/in 飞行复杂性。但是,我想知道 RX 中是否有内置的方法来处理这个问题。
我走在正确的轨道上还是有更好的模式?
没有执行此操作的内置方法。以下是我将如何解决这个问题:
public final class ReactiveCache<K, V> {
final Map<K, Observable<V>> requests = new HashMap<>();
final Function<K, Observable<V>> generator;
public ReactiveCache(Function<K, Observable<V>> generator) {
this.generator = generator;
}
public Observable<V> get(K key) {
ConnectableObservable<V> result;
synchronized (requests) {
Observable<V> current = requests.get(key);
if (current != null) {
return current;
}
result = generator.apply(key)
.doOnTerminate(() -> {
synchronized (requests) {
requests.remove(key);
}
})
.replay();
requests.put(key, result);
}
return result.autoConnect(0);
}
}
在这个 class 中,当您调用 get() 并且有一个运行中的可观察对象时,您可以加入它并接收它的所有值。如果有 none 或者飞行中已经完成,你将开始一个新的 Observable。