RxJava Observable 在第一次发射时得到通知
RxJava Observable get notified on first emission
我有三个与 combineLastest 结合的 Observables:
Observable<String> o1 = Observable.just("1");
Observable<String> o2 = Observable.just("2");
Observable<String> o3 = Observable.just("3");
Observable.combineLatest(o1, o2, o3, new Func3<String, String, String, Object>() {
@Override
public Object call(String s, String s2, String s3) {
return null;
}
});
我想收到有关其中一个 Observable 的第一次发射的通知,而不忽略后来的发射,我想 first operator 会这样做。是否有一个方便的运算符(例如):
o1.doOnFirst(new Func1<String, Void>() {
@Override
public Void call(String s) {
return null;
}
})
我能想到几个解决方案。
第一个是丑陋但简单的 doOnNext 黑客攻击。只需向 Action1
添加一个布尔字段,指示是否已收到第一项。收到后,做任何您想做的事,然后翻转布尔值。例如:
Observable.just("1").doOnNext(new Action1<String>() {
boolean first = true;
@Override
public void call(String t) {
if (first) {
// Do soemthing
first = false;
}
}
});
第二个是使用 publish
或 share()
对您要监视的可观察对象订阅两次,其中一个发布会通过 first
(取决于您是否想要手动连接到已发布的可观察对象)。你最终会得到两个发射相同项目的独立观察者,只有第一个会在第一次发射后停止:
ConnectableObservable<String> o1 = Observable.just("1").publish();
o1.first().subscribe(System.out::println); //Subscirbed only to the first item
o1.subscribe(System.out::println); //Subscirbed to all items
o1.connect(); //Connect both subscribers
我认为如果您正在处理流,您可以使用简单的 take
来获得实用的 doOnFirst
:
public static <T> Observable<T> withDoOnFirst(Observable<T> source, Action1<T> action) {
return source.take(1).doOnNext(action).concatWith(source);
}
这样操作只绑定到第一项。
这可以更改为处理 不是 由添加 skip
的流支持的 observables 以跳过已经采取的项目:
public static <T> Observable<T> withDoOnFirstNonStream(Observable<T> source, Action1<T> action) {
return source.take(1).doOnNext(action).concatWith(source.skip(1));
}
observable
.compose(Transformers.doOnFirst(System.out::println))
它已经过单元测试,并且在幕后仅在运算符中使用每个订阅计数器。请注意,每个订阅很重要,因为有很多用例,其中一个可观察实例被多次使用,我们希望每次都应用 doOnFirst
运算符。
源代码为here.
为方便起见,我为 Flowable
和 Observable
创建了这些扩展函数。
请注意,使用 doOnFirst()
将在第一个元素发射之前调用操作,而 doAfterFirst()
将首先发射第一个项目然后执行操作。
fun <T> Observable<T>.doOnFirst(onFirstAction: (T) -> Unit): Observable<T> =
take(1)
.doOnNext { onFirstAction.invoke(it) }
.concatWith(skip(1))
fun <T> Flowable<T>.doOnFirst(onFirstAction: (T) -> Unit): Flowable<T> =
take(1)
.doOnNext { onFirstAction.invoke(it) }
.concatWith(skip(1))
fun <T> Observable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Observable<T> =
take(1)
.doAfterNext { afterFirstAction.invoke(it) }
.concatWith(skip(1))
fun <T> Flowable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Flowable<T> =
take(1)
.doAfterNext { afterFirstAction.invoke(it) }
.concatWith(skip(1))
用法就这么简单:
Flowable.fromArray(1, 2, 3)
.doOnFirst { System.err.println("First $it") }
.subscribe { println(it) }
输出:
// First 1
// 1
// 2
// 3
并且:
Flowable.fromArray(1, 2, 3)
.doAfterFirst { System.err.println("First $it") }
.subscribe { println(it) }
输出:
// 1
// First 1
// 2
// 3
我有三个与 combineLastest 结合的 Observables:
Observable<String> o1 = Observable.just("1");
Observable<String> o2 = Observable.just("2");
Observable<String> o3 = Observable.just("3");
Observable.combineLatest(o1, o2, o3, new Func3<String, String, String, Object>() {
@Override
public Object call(String s, String s2, String s3) {
return null;
}
});
我想收到有关其中一个 Observable 的第一次发射的通知,而不忽略后来的发射,我想 first operator 会这样做。是否有一个方便的运算符(例如):
o1.doOnFirst(new Func1<String, Void>() {
@Override
public Void call(String s) {
return null;
}
})
我能想到几个解决方案。
第一个是丑陋但简单的 doOnNext 黑客攻击。只需向 Action1
添加一个布尔字段,指示是否已收到第一项。收到后,做任何您想做的事,然后翻转布尔值。例如:
Observable.just("1").doOnNext(new Action1<String>() {
boolean first = true;
@Override
public void call(String t) {
if (first) {
// Do soemthing
first = false;
}
}
});
第二个是使用 publish
或 share()
对您要监视的可观察对象订阅两次,其中一个发布会通过 first
(取决于您是否想要手动连接到已发布的可观察对象)。你最终会得到两个发射相同项目的独立观察者,只有第一个会在第一次发射后停止:
ConnectableObservable<String> o1 = Observable.just("1").publish();
o1.first().subscribe(System.out::println); //Subscirbed only to the first item
o1.subscribe(System.out::println); //Subscirbed to all items
o1.connect(); //Connect both subscribers
我认为如果您正在处理流,您可以使用简单的 take
来获得实用的 doOnFirst
:
public static <T> Observable<T> withDoOnFirst(Observable<T> source, Action1<T> action) {
return source.take(1).doOnNext(action).concatWith(source);
}
这样操作只绑定到第一项。
这可以更改为处理 不是 由添加 skip
的流支持的 observables 以跳过已经采取的项目:
public static <T> Observable<T> withDoOnFirstNonStream(Observable<T> source, Action1<T> action) {
return source.take(1).doOnNext(action).concatWith(source.skip(1));
}
observable
.compose(Transformers.doOnFirst(System.out::println))
它已经过单元测试,并且在幕后仅在运算符中使用每个订阅计数器。请注意,每个订阅很重要,因为有很多用例,其中一个可观察实例被多次使用,我们希望每次都应用 doOnFirst
运算符。
源代码为here.
为方便起见,我为 Flowable
和 Observable
创建了这些扩展函数。
请注意,使用 doOnFirst()
将在第一个元素发射之前调用操作,而 doAfterFirst()
将首先发射第一个项目然后执行操作。
fun <T> Observable<T>.doOnFirst(onFirstAction: (T) -> Unit): Observable<T> =
take(1)
.doOnNext { onFirstAction.invoke(it) }
.concatWith(skip(1))
fun <T> Flowable<T>.doOnFirst(onFirstAction: (T) -> Unit): Flowable<T> =
take(1)
.doOnNext { onFirstAction.invoke(it) }
.concatWith(skip(1))
fun <T> Observable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Observable<T> =
take(1)
.doAfterNext { afterFirstAction.invoke(it) }
.concatWith(skip(1))
fun <T> Flowable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Flowable<T> =
take(1)
.doAfterNext { afterFirstAction.invoke(it) }
.concatWith(skip(1))
用法就这么简单:
Flowable.fromArray(1, 2, 3)
.doOnFirst { System.err.println("First $it") }
.subscribe { println(it) }
输出:
// First 1
// 1
// 2
// 3
并且:
Flowable.fromArray(1, 2, 3)
.doAfterFirst { System.err.println("First $it") }
.subscribe { println(it) }
输出:
// 1
// First 1
// 2
// 3