如何将 concat 与 lambda 和 ObservableSource 一起使用
How to use concat with lambda and ObservableSource
RxJava2
kotlin
这工作正常,我可以连接 2 个 observables
Observable.concat(countries(), animals())
.subscribeBy {
println(it)
}
这个示例我无法理解,因为它使用了一个似乎采用 ObservableSource 的 lambda,我想连接 2 个 observable,但它会导致 null 异常。只是想知道我做错了什么。将 lambda 与 concat 一起使用的目的是什么?
Observable.concat<String> {
it.onNext(countries())
it.onNext(animals())
}.subscribeBy {
println(it)
}
private fun animals(): Observable<String> =
Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")
private fun countries(): Observable<String> =
Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")
这是我遇到的崩溃:
Exception in thread "main" java.lang.NullPointerException
at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)
我想我指的是 ObservableSource
的界面。
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
非常感谢您的任何建议
将 lambda 传递给 concat 解析为 concat(ObservableSource<? extends ObservableSource<? extends T>> sources)
. Because ObservableSource
is an interface with a single non-default method, this triggers Kotlin's SAM conversion。这就是它选择该重载的原因 - 它是唯一具有可以通过 SAM 转换实现的接口的重载。
因此,lambda 是 ObservableSource.subscribe(Observer<? super T> observer)
方法的一个实现。该方法记录为:
Subscribes the given Observer to this ObservableSource instance.
因此,lambda 需要将参数(it
) 订阅到Observables
的来源。您得到一个 NullPointerException
是因为您没有订阅它,而是开始在尚未订阅的 Observer
上调用 onNext
,因此内部状态不正确(在这种情况下,有一个尚未设置的队列,但这不是特别重要)。
为了实现该方法的契约,您只需创建一个 Observable
来发出 Observable
并订阅 it
(Observer
)那个 Observable
在 lambda 中,像这样:
Observable.concat<String> {
Observable.just(countries(), animals()).subscribe(it)
}.subscribeBy {
println(it)
}
我已经在本地对此进行了测试,它产生了预期的结果。
RxJava2
kotlin
这工作正常,我可以连接 2 个 observables
Observable.concat(countries(), animals())
.subscribeBy {
println(it)
}
这个示例我无法理解,因为它使用了一个似乎采用 ObservableSource 的 lambda,我想连接 2 个 observable,但它会导致 null 异常。只是想知道我做错了什么。将 lambda 与 concat 一起使用的目的是什么?
Observable.concat<String> {
it.onNext(countries())
it.onNext(animals())
}.subscribeBy {
println(it)
}
private fun animals(): Observable<String> =
Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")
private fun countries(): Observable<String> =
Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")
这是我遇到的崩溃:
Exception in thread "main" java.lang.NullPointerException
at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)
我想我指的是 ObservableSource
的界面。
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
非常感谢您的任何建议
将 lambda 传递给 concat 解析为 concat(ObservableSource<? extends ObservableSource<? extends T>> sources)
. Because ObservableSource
is an interface with a single non-default method, this triggers Kotlin's SAM conversion。这就是它选择该重载的原因 - 它是唯一具有可以通过 SAM 转换实现的接口的重载。
因此,lambda 是 ObservableSource.subscribe(Observer<? super T> observer)
方法的一个实现。该方法记录为:
Subscribes the given Observer to this ObservableSource instance.
因此,lambda 需要将参数(it
) 订阅到Observables
的来源。您得到一个 NullPointerException
是因为您没有订阅它,而是开始在尚未订阅的 Observer
上调用 onNext
,因此内部状态不正确(在这种情况下,有一个尚未设置的队列,但这不是特别重要)。
为了实现该方法的契约,您只需创建一个 Observable
来发出 Observable
并订阅 it
(Observer
)那个 Observable
在 lambda 中,像这样:
Observable.concat<String> {
Observable.just(countries(), animals()).subscribe(it)
}.subscribeBy {
println(it)
}
我已经在本地对此进行了测试,它产生了预期的结果。