如何将具有不同回调的多个 Observable 合并为一个 Stream?
How to Merge multiple Observables with diferents call backs into one single Stream?
我有这个问题。我正在尝试使用远程应用程序将本地数据库同步到我的 android 应用程序中。我正在创建逻辑来上传本地创建的新信息,服务器用远程 ID 响应并将其保存在服务器中。为了对此进行存档,我使用了一种方法,该方法采用一个对象数组和一个 return 一个 Observable,它为每个元素发出服务器的响应。
像这样。
val dailyEntries = App.db.dailyEntryDao().getDailyEntriesCreated()
dailyEntries.sync(context) //Return an observable
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
val response = DailyEntry(it)//Creates a daily entry using
the response from the server
thread {
App.db.dailyEntryDao().update(response)
}
}, {
it.printStackTrace()
}, {
uploadEnclosures()
})
您怎么看,在当前可观察对象的 onSuccess 中调用了另一个方法。它使用相同的逻辑并显示在前面。
private fun uploadEnclosures() {
thread {
val enclosures = App.db.enclosureDao().getEnclosuresCreated()
enclosures.sync(context)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
val response = Enclosure(it)
thread {
App.db.enclosureDao().update(response)
}
}, {
it.printStackTrace()
}, {
uploadSmokeTest()
})
}
}
它继续所有 tables。我们总是在当前 Observable 的 onSuccess 中执行下一个 table 的更新。这样做是因为我需要按特定顺序进行同步。
我的问题是,有没有办法将所有这些 Observables 合并到一个中,以执行单个订阅并控制每个 onNext 情绪?
感谢您的回答
嗯,是的,但是需要做一些工作,您可以使用 concat
运算符来为您处理排序,并按顺序将 observables
的列表传递给它,并且然后使用一个期望 Any
事件滴入它的观察者订阅它。
为了严格类型安全,您可以使用通用接口标记您的可观察源类型,并使用实例检查来执行特定于事件类型的操作。
查看更多here
代码示例-
fun concatCalls(): Observable<Any> {
return Observable.concat(src1, src2, ...)
}
那么消费者会是这样的-
concatCalls().subscribe(object: Subscriber<Any> {
override fun onNext(o: Any) {
when (o) {
is Object1 -> // do handling for stuff emitted by src1
is Object2 -> // do handling for stuff emitted by src2
....
else // skip
}
....
})
我有这个问题。我正在尝试使用远程应用程序将本地数据库同步到我的 android 应用程序中。我正在创建逻辑来上传本地创建的新信息,服务器用远程 ID 响应并将其保存在服务器中。为了对此进行存档,我使用了一种方法,该方法采用一个对象数组和一个 return 一个 Observable,它为每个元素发出服务器的响应。 像这样。
val dailyEntries = App.db.dailyEntryDao().getDailyEntriesCreated()
dailyEntries.sync(context) //Return an observable
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
val response = DailyEntry(it)//Creates a daily entry using
the response from the server
thread {
App.db.dailyEntryDao().update(response)
}
}, {
it.printStackTrace()
}, {
uploadEnclosures()
})
您怎么看,在当前可观察对象的 onSuccess 中调用了另一个方法。它使用相同的逻辑并显示在前面。
private fun uploadEnclosures() {
thread {
val enclosures = App.db.enclosureDao().getEnclosuresCreated()
enclosures.sync(context)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
val response = Enclosure(it)
thread {
App.db.enclosureDao().update(response)
}
}, {
it.printStackTrace()
}, {
uploadSmokeTest()
})
}
}
它继续所有 tables。我们总是在当前 Observable 的 onSuccess 中执行下一个 table 的更新。这样做是因为我需要按特定顺序进行同步。
我的问题是,有没有办法将所有这些 Observables 合并到一个中,以执行单个订阅并控制每个 onNext 情绪?
感谢您的回答
嗯,是的,但是需要做一些工作,您可以使用 concat
运算符来为您处理排序,并按顺序将 observables
的列表传递给它,并且然后使用一个期望 Any
事件滴入它的观察者订阅它。
为了严格类型安全,您可以使用通用接口标记您的可观察源类型,并使用实例检查来执行特定于事件类型的操作。
查看更多here
代码示例-
fun concatCalls(): Observable<Any> {
return Observable.concat(src1, src2, ...)
}
那么消费者会是这样的-
concatCalls().subscribe(object: Subscriber<Any> {
override fun onNext(o: Any) {
when (o) {
is Object1 -> // do handling for stuff emitted by src1
is Object2 -> // do handling for stuff emitted by src2
....
else // skip
}
....
})