在 Observables 中包装事件监听器
Wrapping event listeners in Observables
我看过很多关于如何将数组或 Iterable
s 之类的有限事物转换为 Observable
s 的示例,但我不确定我是否理解如何制作 Observable
像事件接收者一样活跃且有效地不受限制。我研究了 RxJava2 文档并想出了这个,使用 Android LocationListener
作为示例。
有没有更简单 and/or 更正确的方法来做到这一点? 我知道 "RxBus" 概念,但它似乎是一种方法坚持旧的事件总线范例。
final Observable<Location> locationObservable = Observable.create(new ObservableOnSubscribe<Location>() {
final LocationManager mLocationManager = (LocationManager) getSystemService(LOCATION_SERVICE);
@Override
public void subscribe(final ObservableEmitter<Location> emitter) throws Exception {
final LocationListener listener = new LocationListener() {
@Override
public void onLocationChanged(final Location location) {
emitter.onNext(location);
}
@Override
public void onStatusChanged(final String s, final int i, final Bundle bundle) {
// TODO ???
}
@Override
public void onProviderEnabled(final String s) {
// TODO ???
}
@Override
public void onProviderDisabled(final String s) {
// TODO ???
}
};
mLocationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, 0, 0, listener);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
mLocationManager.removeUpdates(listener);
}
});
emitter.setDisposable(new Disposable() {
private AtomicBoolean mDisposed;
@Override
public void dispose() {
if(mDisposed.compareAndSet(false, true)) {
mLocationManager.removeUpdates(listener);
}
}
@Override
public boolean isDisposed() {
return mDisposed.get();
}
});
}
});
使用Observable.create()
确实是一个正确的方法。
但是,对于 RxJava2,默认方式是扩展 Observable,您可以查看此 了解更多详细信息。
关于您的实施的一些评论:
- Cancellable
和 Disposable
都设置没有意义,因为后面的 cancel/dispose 第一个,你可以看到它们之间的区别 .
- 我认为最好的做法是在开始收听更新之前注册 cancellable/disposable,以防止出现奇怪的边缘情况。
我看过很多关于如何将数组或 Iterable
s 之类的有限事物转换为 Observable
s 的示例,但我不确定我是否理解如何制作 Observable
像事件接收者一样活跃且有效地不受限制。我研究了 RxJava2 文档并想出了这个,使用 Android LocationListener
作为示例。
有没有更简单 and/or 更正确的方法来做到这一点? 我知道 "RxBus" 概念,但它似乎是一种方法坚持旧的事件总线范例。
final Observable<Location> locationObservable = Observable.create(new ObservableOnSubscribe<Location>() {
final LocationManager mLocationManager = (LocationManager) getSystemService(LOCATION_SERVICE);
@Override
public void subscribe(final ObservableEmitter<Location> emitter) throws Exception {
final LocationListener listener = new LocationListener() {
@Override
public void onLocationChanged(final Location location) {
emitter.onNext(location);
}
@Override
public void onStatusChanged(final String s, final int i, final Bundle bundle) {
// TODO ???
}
@Override
public void onProviderEnabled(final String s) {
// TODO ???
}
@Override
public void onProviderDisabled(final String s) {
// TODO ???
}
};
mLocationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, 0, 0, listener);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
mLocationManager.removeUpdates(listener);
}
});
emitter.setDisposable(new Disposable() {
private AtomicBoolean mDisposed;
@Override
public void dispose() {
if(mDisposed.compareAndSet(false, true)) {
mLocationManager.removeUpdates(listener);
}
}
@Override
public boolean isDisposed() {
return mDisposed.get();
}
});
}
});
使用Observable.create()
确实是一个正确的方法。
但是,对于 RxJava2,默认方式是扩展 Observable,您可以查看此
关于您的实施的一些评论:
- Cancellable
和 Disposable
都设置没有意义,因为后面的 cancel/dispose 第一个,你可以看到它们之间的区别
- 我认为最好的做法是在开始收听更新之前注册 cancellable/disposable,以防止出现奇怪的边缘情况。