如何将具有不同回调的多个 Observable 合并为一个 Stream?

How to Merge multiple Observables with diferents call backs into one single Stream?

我有这个问题。我正在尝试使用远程应用程序将本地数据库同步到我的 android 应用程序中。我正在创建逻辑来上传本地创建的新信息,服务器用远程 ID 响应并将其保存在服务器中。为了对此进行存档,我使用了一种方法,该方法采用一个对象数组和一个 return 一个 Observable,它为每个元素发出服务器的响应。 像这样。

val dailyEntries = App.db.dailyEntryDao().getDailyEntriesCreated()
            dailyEntries.sync(context) //Return an observable
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe({                        
                        val response = DailyEntry(it)//Creates a daily entry using 
                                                      the response from the server
                        thread {                                
                          App.db.dailyEntryDao().update(response)
                        }
                    }, {
                        it.printStackTrace()
                    }, {
                        uploadEnclosures()
                    })

您怎么看,在当前可观察对象的 onSuccess 中调用了另一个方法。它使用相同的逻辑并显示在前面。

private fun uploadEnclosures() {
        thread {
            val enclosures = App.db.enclosureDao().getEnclosuresCreated()
            enclosures.sync(context)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe({
                        val response = Enclosure(it)
                        thread {                                
                            App.db.enclosureDao().update(response)
                        }
                    }, {
                        it.printStackTrace()
                    }, {
                        uploadSmokeTest()
                    })
        }
    }

它继续所有 tables。我们总是在当前 Observable 的 onSuccess 中执行下一个 table 的更新。这样做是因为我需要按特定顺序进行同步。

我的问题是,有没有办法将所有这些 Observables 合并到一个中,以执行单个订阅并控制每个 onNext 情绪?

感谢您的回答

嗯,是的,但是需要做一些工作,您可以使用 concat 运算符来为您处理排序,并按顺序将 observables 的列表传递给它,并且然后使用一个期望 Any 事件滴入它的观察者订阅它。

为了严格类型安全,您可以使用通用接口标记您的可观察源类型,并使用实例检查来执行特定于事件类型的操作。

查看更多here

代码示例-

fun concatCalls(): Observable<Any> {
    return Observable.concat(src1, src2, ...)
}

那么消费者会是这样的-

concatCalls().subscribe(object: Subscriber<Any> {
    override fun onNext(o: Any) {
       when (o) {
           is Object1 -> // do handling for stuff emitted by src1
           is Object2 -> // do handling for stuff emitted by src2
           ....
           else // skip
    }
    ....
})