为进行中的 API 请求重新使用可观察对象

Re-using observables for in-flight API requests

我有一个组件 A,它自己查询 API 并使用 RX 将结果存储在缓存中:

List<Data> cache = ...;

void queryData(int parameter) {
    myApi.getData(parameter).subscribe(data => cache.add(data));
}

现在,我想从其他组件查询这个缓存。在缓存未命中时,我希望设计允许重新使用正在进行的订阅(如果存在)。这是为了避免重复相同的请求。

我正在考虑将 Observable 存储在单独的缓存中(仍在我的组件 A 中)。

List<Observable<Data>> requestCache = ...;

public Observable<Data> getData(int parameter) {
    // if cache contains data, return new observable that emits that item

    // if cache does not contain data, check if requestCache contains the Observable I need. If so, return it.

    // if all else fails, return queryData(parameter)

} 

我需要将我原来的 queryData 修改为:

Observable<Data> queryData(int parameter) {
    Observable<Data> observable = myApi.getData(parameter);

    requestCache.add(observable);

    observable.subscribe(data => cache.add(data));
    observable.subscribe(data => requestCache.remove(observable));

    return observable;
}

这将完全隐藏这些请求对其他组件的 caching/in 飞行复杂性。但是,我想知道 RX 中是否有内置的方法来处理这个问题。

我走在正确的轨道上还是有更好的模式?

没有执行此操作的内置方法。以下是我将如何解决这个问题:

public final class ReactiveCache<K, V> {
    final Map<K, Observable<V>> requests = new HashMap<>();

    final Function<K, Observable<V>> generator;

    public ReactiveCache(Function<K, Observable<V>> generator) {
        this.generator = generator;
    }

    public Observable<V> get(K key) {
        ConnectableObservable<V> result;
        synchronized (requests) {
            Observable<V> current = requests.get(key);
            if (current != null) {
                return current;
            }

            result = generator.apply(key)
                    .doOnTerminate(() -> {
                        synchronized (requests) {
                            requests.remove(key);
                        }
                    })
                    .replay();

            requests.put(key, result);
        }

        return result.autoConnect(0);
    }
}

在这个 class 中,当您调用 get() 并且有一个运行中的可观察对象时,您可以加入它并接收它的所有值。如果有 none 或者飞行中已经完成,你将开始一个新的 Observable。