RxJava:创建一个自依赖流
RxJava: create a self-dependent stream
说,我想在 RxJava 中做一个 Observable
,它有一个反馈耦合,如下图所示。
我已经通过使用主题实现了这一目标,如下所示:
// Observable<Integer> source = Observable.range(0, 6);
public Observable<Integer> getFeedbackSum(Observable<Integer> source) {
UnicastSubject<Integer> feedback = UnicastSubject.create();
Observable<Integer> feedbackSum = Observable.zip(source, feedback.startWith(0), Pair::create)
.map(pair -> pair.first + pair.second);
feedbackSum.subscribe(feedback);
return feedbackSum;
}
看起来有点丑。有没有更好的方法?
这里有一个运算符:scan
:
public final <R> Observable<R> scan(R initialValue, BiFunction<R,? super T,R> accumulator)
Observable.range(0, 6)
.scan(0, (a, b) -> a + b)
.test()
.assertResut(0, 1, 3, 6, 10, 15, 21);
如果你的累加器类型不是不可变的,你可以使用 scanWith
:
public final <R> Observable<R> scanWith(Callable<R> seedSupplier, BiFunction<R,? super T,R> accumulator)
说,我想在 RxJava 中做一个 Observable
,它有一个反馈耦合,如下图所示。
我已经通过使用主题实现了这一目标,如下所示:
// Observable<Integer> source = Observable.range(0, 6);
public Observable<Integer> getFeedbackSum(Observable<Integer> source) {
UnicastSubject<Integer> feedback = UnicastSubject.create();
Observable<Integer> feedbackSum = Observable.zip(source, feedback.startWith(0), Pair::create)
.map(pair -> pair.first + pair.second);
feedbackSum.subscribe(feedback);
return feedbackSum;
}
看起来有点丑。有没有更好的方法?
这里有一个运算符:scan
:
public final <R> Observable<R> scan(R initialValue, BiFunction<R,? super T,R> accumulator)
Observable.range(0, 6)
.scan(0, (a, b) -> a + b)
.test()
.assertResut(0, 1, 3, 6, 10, 15, 21);
如果你的累加器类型不是不可变的,你可以使用 scanWith
:
public final <R> Observable<R> scanWith(Callable<R> seedSupplier, BiFunction<R,? super T,R> accumulator)