RxJava - 将 Observable 转换为 Iterator、Stream 或 Sequence
RxJava- Turn Observable into Iterator, Stream, or Sequence
我知道这违反了很多 Rx 规则,但我真的很喜欢 RxJava-JDBC,我的队友也是。关系数据库是我们所做工作的核心,Rx 也是如此。
然而,在某些情况下,我们不想作为 Observable<ResultSet>
发出,而只想使用基于拉动的 Java 8 Stream<ResultSet>
或 Kotlin Sequence<ResultSet>
.但是我们非常习惯RxJava-JDBC库,它只有returns一个Observable<ResultSet>
。
因此,我想知道是否有一种方法可以使用扩展函数将 Observable<ResultSet>
变成 Sequence<ResultSet>
,而不进行任何中间收集或 toBlocking()
调用。以下是我到目前为止的所有内容,但我现在正在尝试连接基于推送和拉取的系统,但我无法缓冲,因为 ResultSet
对每个 onNext()
调用都是有状态的。这是不可能完成的任务吗?
import rx.Observable
import rx.Subscriber
import java.sql.ResultSet
fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {
private var isComplete = false
override fun onCompleted() {
isComplete = true
}
override fun onError(e: Throwable?) {
throw UnsupportedOperationException()
}
override fun onNext(rs: ResultSet?) {
throw UnsupportedOperationException()
}
override fun hasNext(): Boolean {
throw UnsupportedOperationException()
}
override fun next(): ResultSet {
throw UnsupportedOperationException()
}
}.asSequence()
我不确定这是实现您想要的效果的最简单方法,但您可以尝试此代码。它通过创建阻塞队列并将来自 Observable
的所有事件发布到此队列来将 Observable
转换为 Iterator
。 Iterable
从队列中拉取事件,如果有 none 则阻塞。然后它根据接收到的当前事件修改自己的状态。
class ObservableIterator<T>(
observable: Observable<T>,
scheduler: Scheduler
) : Iterator<T>, Closeable {
private val queue = LinkedBlockingQueue<Notification<T>>()
private var cached: Notification<T>? = null
private var completed: Boolean = false
private val subscription =
observable
.materialize()
.subscribeOn(scheduler)
.subscribe({ queue.put(it) })
override fun hasNext(): Boolean {
cacheNext()
return !completed
}
override fun next(): T {
cacheNext()
val notification = cached ?: throw NoSuchElementException()
check(notification.isOnNext)
cached = null
return notification.value
}
private fun cacheNext() {
if (completed) {
return
}
if (cached == null) {
queue.take().let { notification ->
if (notification.isOnError) {
completed = true
throw RuntimeException(notification.throwable)
} else if (notification.isOnCompleted) {
completed = true
} else {
cached = notification
}
}
}
}
override fun close() {
subscription.unsubscribe()
completed = true
cached = null
}
}
您可以使用以下辅助函数:
fun <T> Observable<T>.asSequence() = Sequence { toBlocking().getIterator() }
当为迭代器调用返回的序列时,将订阅可观察对象。
如果一个 observable 在它订阅的同一线程上发出元素(例如 Observable.just
),它将在它有机会返回之前填充迭代器的缓冲区。
在这种情况下,您可能需要通过调用 subscribeOn
:
来直接订阅不同的线程
observable.subscribeOn(scheduler).asSequence()
但是,虽然 toBlocking().getIterator()
不会缓冲所有结果,但如果迭代器没有及时使用它们,它可以缓冲其中的一些结果。如果 ResultSet
在下一个 ResultSet
到达时以某种方式过期,这可能是个问题。
我知道这违反了很多 Rx 规则,但我真的很喜欢 RxJava-JDBC,我的队友也是。关系数据库是我们所做工作的核心,Rx 也是如此。
然而,在某些情况下,我们不想作为 Observable<ResultSet>
发出,而只想使用基于拉动的 Java 8 Stream<ResultSet>
或 Kotlin Sequence<ResultSet>
.但是我们非常习惯RxJava-JDBC库,它只有returns一个Observable<ResultSet>
。
因此,我想知道是否有一种方法可以使用扩展函数将 Observable<ResultSet>
变成 Sequence<ResultSet>
,而不进行任何中间收集或 toBlocking()
调用。以下是我到目前为止的所有内容,但我现在正在尝试连接基于推送和拉取的系统,但我无法缓冲,因为 ResultSet
对每个 onNext()
调用都是有状态的。这是不可能完成的任务吗?
import rx.Observable
import rx.Subscriber
import java.sql.ResultSet
fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {
private var isComplete = false
override fun onCompleted() {
isComplete = true
}
override fun onError(e: Throwable?) {
throw UnsupportedOperationException()
}
override fun onNext(rs: ResultSet?) {
throw UnsupportedOperationException()
}
override fun hasNext(): Boolean {
throw UnsupportedOperationException()
}
override fun next(): ResultSet {
throw UnsupportedOperationException()
}
}.asSequence()
我不确定这是实现您想要的效果的最简单方法,但您可以尝试此代码。它通过创建阻塞队列并将来自 Observable
的所有事件发布到此队列来将 Observable
转换为 Iterator
。 Iterable
从队列中拉取事件,如果有 none 则阻塞。然后它根据接收到的当前事件修改自己的状态。
class ObservableIterator<T>(
observable: Observable<T>,
scheduler: Scheduler
) : Iterator<T>, Closeable {
private val queue = LinkedBlockingQueue<Notification<T>>()
private var cached: Notification<T>? = null
private var completed: Boolean = false
private val subscription =
observable
.materialize()
.subscribeOn(scheduler)
.subscribe({ queue.put(it) })
override fun hasNext(): Boolean {
cacheNext()
return !completed
}
override fun next(): T {
cacheNext()
val notification = cached ?: throw NoSuchElementException()
check(notification.isOnNext)
cached = null
return notification.value
}
private fun cacheNext() {
if (completed) {
return
}
if (cached == null) {
queue.take().let { notification ->
if (notification.isOnError) {
completed = true
throw RuntimeException(notification.throwable)
} else if (notification.isOnCompleted) {
completed = true
} else {
cached = notification
}
}
}
}
override fun close() {
subscription.unsubscribe()
completed = true
cached = null
}
}
您可以使用以下辅助函数:
fun <T> Observable<T>.asSequence() = Sequence { toBlocking().getIterator() }
当为迭代器调用返回的序列时,将订阅可观察对象。
如果一个 observable 在它订阅的同一线程上发出元素(例如 Observable.just
),它将在它有机会返回之前填充迭代器的缓冲区。
在这种情况下,您可能需要通过调用 subscribeOn
:
observable.subscribeOn(scheduler).asSequence()
但是,虽然 toBlocking().getIterator()
不会缓冲所有结果,但如果迭代器没有及时使用它们,它可以缓冲其中的一些结果。如果 ResultSet
在下一个 ResultSet
到达时以某种方式过期,这可能是个问题。