可观察间隔 flatMap 组合不起作用
Observable interval flatMap combination doesnt work
我是 rxjava 2 的新手,我正在尝试以给定的时间间隔在后台执行 someMethod 并在 UI 线程上使用结果。有人可以指出我在代码中出错的地方,或者更好地提供可以完成我需要的工作的最佳代码吗?
@Override
protected void onStop() {
subject.onNext(Long.valueOf(10005));
observable.unsubscribeOn(Schedulers.io());
super.onStop();
}
private void initAzimuthUpdater() {
subject = PublishSubject.create();
observable = Observable.interval(500, TimeUnit.MILLISECONDS)
.takeWhile(new Predicate<Long>() {
@Override
public boolean test(@NonNull Long aLong) throws Exception {
Log.d(TAG, "xxxxxxxxxxxx test: " + aLong);
return aLong != Long.valueOf(10005);
}
});
observable.flatMap(new Function<Long, ObservableSource<Float>>() {
@Override
public ObservableSource<Float> apply(@NonNull Long aLong) throws Exception {
return PublishSubject.create(new ObservableOnSubscribe<Float>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Float> e) throws Exception {
e.onNext(someMethod());
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Float>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "xxxxxxxx onSubscribe:" + System.currentTimeMillis());
isRunning = true;
}
@Override
public void onNext(@NonNull Float o) {
Log.d(TAG, "xxxxxxxx onNext:" + System.currentTimeMillis());
//update UI
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "xxxxxxxx onError:" + System.currentTimeMillis());
}
@Override
public void onComplete() {
Log.d(TAG, "xxxxxxxx onComplete:" + System.currentTimeMillis());
}
});
subject.mergeWith(azimuthObservable);
}
您应该使用 DisposableObserver
和 subscribeWith
,将 Disposable
保存到 CompositeDisposable
,然后从 onStop()
调用 clear()
. Observable
运算符 return 一个新实例,因此忽略它们的 return 值将不会影响原始流。
CompositeDisposable composite = new CompositeDisposable();
@Override
protected void onStop() {
composite.clear();
super.onStop();
}
private void initAzimuthUpdater() {
Disposable d = Observable.interval(500, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Float>>() {
@Override
public ObservableSource<Float> apply(@NonNull Long aLong)
throws Exception {
return Observable.create(new ObservableOnSubscribe<Float>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Float> e)
throws Exception {
e.onNext(someMethod());
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Float>() {
@Override
public void onStart() {
Log.d(TAG, "xxxxxxxx onSubscribe:" + System.currentTimeMillis());
isRunning = true;
}
@Override
public void onNext(@NonNull Float o) {
Log.d(TAG, "xxxxxxxx onNext:" + System.currentTimeMillis());
//update UI
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "xxxxxxxx onError:" + System.currentTimeMillis());
}
@Override
public void onComplete() {
Log.d(TAG, "xxxxxxxx onComplete:" + System.currentTimeMillis());
}
});
composite.add(d);
}
我是 rxjava 2 的新手,我正在尝试以给定的时间间隔在后台执行 someMethod 并在 UI 线程上使用结果。有人可以指出我在代码中出错的地方,或者更好地提供可以完成我需要的工作的最佳代码吗?
@Override
protected void onStop() {
subject.onNext(Long.valueOf(10005));
observable.unsubscribeOn(Schedulers.io());
super.onStop();
}
private void initAzimuthUpdater() {
subject = PublishSubject.create();
observable = Observable.interval(500, TimeUnit.MILLISECONDS)
.takeWhile(new Predicate<Long>() {
@Override
public boolean test(@NonNull Long aLong) throws Exception {
Log.d(TAG, "xxxxxxxxxxxx test: " + aLong);
return aLong != Long.valueOf(10005);
}
});
observable.flatMap(new Function<Long, ObservableSource<Float>>() {
@Override
public ObservableSource<Float> apply(@NonNull Long aLong) throws Exception {
return PublishSubject.create(new ObservableOnSubscribe<Float>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Float> e) throws Exception {
e.onNext(someMethod());
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Float>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "xxxxxxxx onSubscribe:" + System.currentTimeMillis());
isRunning = true;
}
@Override
public void onNext(@NonNull Float o) {
Log.d(TAG, "xxxxxxxx onNext:" + System.currentTimeMillis());
//update UI
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "xxxxxxxx onError:" + System.currentTimeMillis());
}
@Override
public void onComplete() {
Log.d(TAG, "xxxxxxxx onComplete:" + System.currentTimeMillis());
}
});
subject.mergeWith(azimuthObservable);
}
您应该使用 DisposableObserver
和 subscribeWith
,将 Disposable
保存到 CompositeDisposable
,然后从 onStop()
调用 clear()
. Observable
运算符 return 一个新实例,因此忽略它们的 return 值将不会影响原始流。
CompositeDisposable composite = new CompositeDisposable();
@Override
protected void onStop() {
composite.clear();
super.onStop();
}
private void initAzimuthUpdater() {
Disposable d = Observable.interval(500, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Float>>() {
@Override
public ObservableSource<Float> apply(@NonNull Long aLong)
throws Exception {
return Observable.create(new ObservableOnSubscribe<Float>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Float> e)
throws Exception {
e.onNext(someMethod());
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Float>() {
@Override
public void onStart() {
Log.d(TAG, "xxxxxxxx onSubscribe:" + System.currentTimeMillis());
isRunning = true;
}
@Override
public void onNext(@NonNull Float o) {
Log.d(TAG, "xxxxxxxx onNext:" + System.currentTimeMillis());
//update UI
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "xxxxxxxx onError:" + System.currentTimeMillis());
}
@Override
public void onComplete() {
Log.d(TAG, "xxxxxxxx onComplete:" + System.currentTimeMillis());
}
});
composite.add(d);
}