RxJava Observable.cache 失效
RxJava Observable.cache invalidate
我正在尝试在 Android 环境中学习 rxjava。
假设我有一个发出网络调用结果的可观察对象。
如果我理解正确,处理配置更改的一种广泛常用的方法是:
将可观察对象存储在保留的片段/单例/应用程序对象中
将 cache 运算符应用于 observable
在适当的生命周期处理程序中订阅/取消订阅
这样做,我们不会丢失 observable 的结果,一旦新配置发生,它将重新 observed。
现在,我的问题是:
有没有办法强制 observable 发出一个新值(并使缓存的值无效)?每次我想要来自网络的新数据时,我是否需要创建一个新的可观察对象(这在 android 世界中听起来不是一个坏习惯,因为会让 gc 做额外的工作)?
非常感谢,
费德里科
制作一个自定义的 OnSubscribe
实现来满足您的需求:
public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {
private final AtomicBoolean refresh = new AtomicBoolean(true);
private final Observable<T> source;
private volatile Observable<T> current;
public OnSubscribeRefreshingCache(Observable<T> source) {
this.source = source;
this.current = source;
}
public void reset() {
refresh.set(true);
}
@Override
public void call(Subscriber<? super T> subscriber) {
if (refresh.compareAndSet(true, false)) {
current = source.cache();
}
current.unsafeSubscribe(subscriber);
}
}
这段代码演示了用法并表明缓存实际上正在被重置:
Observable<Integer> o = Observable.just(1)
.doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher =
new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);
输出:
completed
1
1
completed
1
顺便说一下,您可能会注意到 .cache
直到完成才发出。这是一个应该由 rxjava 1.0.14 修复的错误。
就您的 GC 压力问题而言,每个应用于 Observable 的运算符通常通过 lift
或 create
创建一个新的 Observable。与创建新 Observable 相关联的基本成员状态是对 onSubscribe
函数的引用。 cache
与大多数不同,因为它在订阅中保存状态,如果它保存大量状态并经常被丢弃,则可能会产生 GC 压力。即使您使用相同的可变数据结构来保存重置后的状态,GC 仍然必须在清除时处理数据结构的内容,因此您可能不会获得太多收益。
RxJava cache
运算符是为多个并发订阅而构建的。您可能会想象重置功能可能会在实施时出现问题。如果您想进一步探索,请务必在 RxJava github 上提出问题。
我正在尝试在 Android 环境中学习 rxjava。 假设我有一个发出网络调用结果的可观察对象。 如果我理解正确,处理配置更改的一种广泛常用的方法是:
将可观察对象存储在保留的片段/单例/应用程序对象中
将 cache 运算符应用于 observable
在适当的生命周期处理程序中订阅/取消订阅
这样做,我们不会丢失 observable 的结果,一旦新配置发生,它将重新 observed。
现在,我的问题是:
有没有办法强制 observable 发出一个新值(并使缓存的值无效)?每次我想要来自网络的新数据时,我是否需要创建一个新的可观察对象(这在 android 世界中听起来不是一个坏习惯,因为会让 gc 做额外的工作)?
非常感谢,
费德里科
制作一个自定义的 OnSubscribe
实现来满足您的需求:
public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {
private final AtomicBoolean refresh = new AtomicBoolean(true);
private final Observable<T> source;
private volatile Observable<T> current;
public OnSubscribeRefreshingCache(Observable<T> source) {
this.source = source;
this.current = source;
}
public void reset() {
refresh.set(true);
}
@Override
public void call(Subscriber<? super T> subscriber) {
if (refresh.compareAndSet(true, false)) {
current = source.cache();
}
current.unsafeSubscribe(subscriber);
}
}
这段代码演示了用法并表明缓存实际上正在被重置:
Observable<Integer> o = Observable.just(1)
.doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher =
new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);
输出:
completed
1
1
completed
1
顺便说一下,您可能会注意到 .cache
直到完成才发出。这是一个应该由 rxjava 1.0.14 修复的错误。
就您的 GC 压力问题而言,每个应用于 Observable 的运算符通常通过 lift
或 create
创建一个新的 Observable。与创建新 Observable 相关联的基本成员状态是对 onSubscribe
函数的引用。 cache
与大多数不同,因为它在订阅中保存状态,如果它保存大量状态并经常被丢弃,则可能会产生 GC 压力。即使您使用相同的可变数据结构来保存重置后的状态,GC 仍然必须在清除时处理数据结构的内容,因此您可能不会获得太多收益。
RxJava cache
运算符是为多个并发订阅而构建的。您可能会想象重置功能可能会在实施时出现问题。如果您想进一步探索,请务必在 RxJava github 上提出问题。