如何将 concat 与 lambda 和 ObservableSource 一起使用

How to use concat with lambda and ObservableSource

RxJava2
kotlin 

这工作正常,我可以连接 2 个 observables

   Observable.concat(countries(), animals())
                    .subscribeBy {
                        println(it)
                    }

这个示例我无法理解,因为它使用了一个似乎采用 ObservableSource 的 lambda,我想连接 2 个 observable,但它会导致 null 异常。只是想知道我做错了什么。将 lambda 与 concat 一起使用的目的是什么?

        Observable.concat<String> {
                    it.onNext(countries())
                    it.onNext(animals())
                }.subscribeBy {
                    println(it)
                }


    private fun animals(): Observable<String> =
            Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")

    private fun countries(): Observable<String> =
            Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")

这是我遇到的崩溃:

Exception in thread "main" java.lang.NullPointerException
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)

我想我指的是 ObservableSource 的界面。

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

非常感谢您的任何建议

将 lambda 传递给 concat 解析为 concat(ObservableSource<? extends ObservableSource<? extends T>> sources). Because ObservableSource is an interface with a single non-default method, this triggers Kotlin's SAM conversion。这就是它选择该重载的原因 - 它是唯一具有可以通过 SAM 转换实现的接口的重载。

因此,lambda 是 ObservableSource.subscribe(Observer<? super T> observer) 方法的一个实现。该方法记录为:

Subscribes the given Observer to this ObservableSource instance.

因此,lambda 需要将参数(it) 订阅到Observables 的来源。您得到一个 NullPointerException 是因为您没有订阅它,而是开始在尚未订阅的 Observer 上调用 onNext,因此内部状态不正确(在这种情况下,有一个尚未设置的队列,但这不是特别重要)。

为了实现该方法的契约,您只需创建一个 Observable 来发出 Observable 并订阅 itObserver)那个 Observable 在 lambda 中,像这样:

Observable.concat<String> {
    Observable.just(countries(), animals()).subscribe(it)
}.subscribeBy {
    println(it)
}

我已经在本地对此进行了测试,它产生了预期的结果。