如何按键连接两个 RxJava2 Obvervables?
How do I join two RxJava2 Obvervables by key?
我有两个不同类型的未排序的可观察对象。这两种类型共享一个公共密钥。我想将它们加入一个新的可观察发射对的相应元素,但我不知道该怎么做。
请注意,某些键可能丢失了。如果不删除完整的对就可以了,但是用 null
代替丢失的部分会更好。
输入 1:
Entity(id = 2),
Entity(id = 1),
Entity(id = 4)
输入 2:
Dto(id = 3),
Dto(id = 2),
Dto(id = 1)
预期输出(任意顺序):
Pair(Entity(id = 1), Dto(id = 1)),
Pair(Entity(id = 2), Dto(id = 2)),
Pair(null, Dto(id = 3)),
Pair(Entity(id = 4), null)
首先,Observable.merge
一起流式传输:这为您提供了所有项目的流式传输。 (在下面的代码中,我使用自定义 Either
class 来标记每个流。)
然后,对于流中的每个项目,尝试将其与先前观察到的其他类型的项目进行匹配,并输出该对。如果没有,请保存以备后用。
最后,流完成后,剩余的未匹配元素将不会与任何东西匹配,因此它们可以不成对地发射。
import io.reactivex.Observable
data class Entity(val id: Int)
data class Dto(val id: Int)
sealed class Either<out A, out B>
data class Left<A>(val value: A) : Either<A, Nothing>()
data class Right<B>(val value: B) : Either<Nothing, B>()
fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> {
val unmatchedA = mutableMapOf<C, A>()
val unmatchedB = mutableMapOf<C, B>()
val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest ->
when (latest) {
is Left -> {
val id = idA(latest.value)
unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) }
unmatchedA.put(id, latest.value)
}
is Right -> {
val id = idB(latest.value)
unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) }
unmatchedB.put(id, latest.value)
}
}
Observable.empty<Nothing>()
}
return Observable.concat(merged, Observable.create { emitter ->
unmatchedA.values.forEach { emitter.onNext(it to null) }
unmatchedB.values.forEach { emitter.onNext(null to it) }
emitter.onComplete()
})
}
fun main(args: Array<String>) {
val entities = Observable.just(Entity(2), Entity(1), Entity(4))
val dtos = Observable.just(Dto(3), Dto(2), Dto(1))
joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println)
}
(Entity(id=2), Dto(id=2))
(Entity(id=1), Dto(id=1))
(Entity(id=4), null)
(null, Dto(id=3))
请注意,如果 ID 在流中重复,这可能会有一些奇怪的行为,并且根据流的结构,这可能最终会在内存中缓冲大量元素。
我有两个不同类型的未排序的可观察对象。这两种类型共享一个公共密钥。我想将它们加入一个新的可观察发射对的相应元素,但我不知道该怎么做。
请注意,某些键可能丢失了。如果不删除完整的对就可以了,但是用 null
代替丢失的部分会更好。
输入 1:
Entity(id = 2),
Entity(id = 1),
Entity(id = 4)
输入 2:
Dto(id = 3),
Dto(id = 2),
Dto(id = 1)
预期输出(任意顺序):
Pair(Entity(id = 1), Dto(id = 1)),
Pair(Entity(id = 2), Dto(id = 2)),
Pair(null, Dto(id = 3)),
Pair(Entity(id = 4), null)
首先,Observable.merge
一起流式传输:这为您提供了所有项目的流式传输。 (在下面的代码中,我使用自定义 Either
class 来标记每个流。)
然后,对于流中的每个项目,尝试将其与先前观察到的其他类型的项目进行匹配,并输出该对。如果没有,请保存以备后用。
最后,流完成后,剩余的未匹配元素将不会与任何东西匹配,因此它们可以不成对地发射。
import io.reactivex.Observable
data class Entity(val id: Int)
data class Dto(val id: Int)
sealed class Either<out A, out B>
data class Left<A>(val value: A) : Either<A, Nothing>()
data class Right<B>(val value: B) : Either<Nothing, B>()
fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> {
val unmatchedA = mutableMapOf<C, A>()
val unmatchedB = mutableMapOf<C, B>()
val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest ->
when (latest) {
is Left -> {
val id = idA(latest.value)
unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) }
unmatchedA.put(id, latest.value)
}
is Right -> {
val id = idB(latest.value)
unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) }
unmatchedB.put(id, latest.value)
}
}
Observable.empty<Nothing>()
}
return Observable.concat(merged, Observable.create { emitter ->
unmatchedA.values.forEach { emitter.onNext(it to null) }
unmatchedB.values.forEach { emitter.onNext(null to it) }
emitter.onComplete()
})
}
fun main(args: Array<String>) {
val entities = Observable.just(Entity(2), Entity(1), Entity(4))
val dtos = Observable.just(Dto(3), Dto(2), Dto(1))
joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println)
}
(Entity(id=2), Dto(id=2))
(Entity(id=1), Dto(id=1))
(Entity(id=4), null)
(null, Dto(id=3))
请注意,如果 ID 在流中重复,这可能会有一些奇怪的行为,并且根据流的结构,这可能最终会在内存中缓冲大量元素。