RxJava Observable.cache 失效

RxJava Observable.cache invalidate

我正在尝试在 Android 环境中学习 rxjava。 假设我有一个发出网络调用结果的可观察对象。 如果我理解正确,处理配置更改的一种广泛常用的方法是:

这样做,我们不会丢失 observable 的结果,一旦新配置发生,它将重新 observed。

现在,我的问题是:

有没有办法强制 observable 发出一个新值(并使缓存的值无效)?每次我想要来自网络的新数据时,我是否需要创建一个新的可观察对象(这在 android 世界中听起来不是一个坏习惯,因为会让 gc 做额外的工作)?

非常感谢,

费德里科

制作一个自定义的 OnSubscribe 实现来满足您的需求:

public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {

    private final AtomicBoolean refresh = new AtomicBoolean(true);
    private final Observable<T> source;
    private volatile Observable<T> current;

    public OnSubscribeRefreshingCache(Observable<T> source) {
        this.source = source;
        this.current = source;
    }

    public void reset() {
        refresh.set(true);
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        if (refresh.compareAndSet(true, false)) {
            current = source.cache();
        }
        current.unsafeSubscribe(subscriber);
    }

}

这段代码演示了用法并表明缓存实际上正在被重置:

Observable<Integer> o = Observable.just(1)
        .doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher = 
    new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);

输出:

completed
1
1
completed
1

顺便说一下,您可能会注意到 .cache 直到完成才发出。这是一个应该由 rxjava 1.0.14 修复的错误。

就您的 GC 压力问题而言,每个应用于 Observable 的运算符通常通过 liftcreate 创建一个新的 Observable。与创建新 Observable 相关联的基本成员状态是对 onSubscribe 函数的引用。 cache 与大多数不同,因为它在订阅中保存状态,如果它保存大量状态并经常被丢弃,则可能会产生 GC 压力。即使您使用相同的可变数据结构来保存重置后的状态,GC 仍然必须在清除时处理数据结构的内容,因此您可能不会获得太多收益。

RxJava cache 运算符是为多个并发订阅而构建的。您可能会想象重置功能可能会在实施时出现问题。如果您想进一步探索,请务必在 RxJava github 上提出问题。