如何在 RxJava (RxScala) 中实现 observeLatestOn?

How to implement observeLatestOn in RxJava (RxScala)?

我正在尝试在 RxJava(实际上是 RxScala)中实现 ObserveLatestOn 运算符。

当我们有一个快速的生产者和一个慢速的订阅者时,这个运算符很有用,但订阅者不关心它在消费一个项目时丢失的任何项目。

大理石图:

--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|

=字符表示订阅者正在执行一项较长的运行工作,>字符表示刚刚完成的工作。作为典型的用法示例,想象一些需要显示的数据的生产者,以及作为订阅者的数据的屏幕渲染器。渲染时间比较长,但是我们不需要在屏幕上渲染每一步,最后一步就完美了。

在上面的弹珠图中,生产者发信号1,订阅者开始处理,耗时较长。同时,生产者发出 2 和 3,之后订阅者才完成工作。它看到生产者发出的最后一个项目是 3,所以它开始处理它。那很快,这期间还没有新的item产生,订阅者就可以休息了。然后,5来了,故事以同样的方式继续。

我花了几个小时尝试实现这个看似简单的运算符,但我仍然不满意。运算符的本质表明它应该是异步的,它应该在与接收它们的调度程序不同的调度程序上发出它的项目。但与此同时,我当然不希望在没有工作可做的情况下让线程被工作人员占用。

这是我到目前为止的想法:

def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
  @volatile var maybeNextItem: Option[Notification[T]] = None
  @volatile var isWorkScheduled = false
  val itemsQueueLock = new Object()

  Observable(subscriber ⇒ {
    def signalToSubscriber(materializedItem: Notification[T]): Unit = {
      materializedItem match {
        case OnNext(item) ⇒ subscriber onNext item
        case OnError(error) ⇒ subscriber onError error
        case OnCompleted ⇒ subscriber.onCompleted()
      }
    }

    def queueItem(item: Notification[T]): Unit = {
      val worker = scheduler.createWorker

      val shouldScheduleWork = itemsQueueLock synchronized {
        val result = !isWorkScheduled
        maybeNextItem = Some(item)
        isWorkScheduled = true
        result
      }

      if (shouldScheduleWork) {
        worker.scheduleRec {
          val maybeNextItemToSignal = itemsQueueLock synchronized {
            val result = maybeNextItem
            if (result.isEmpty) {
              worker.unsubscribe()
              isWorkScheduled = false
            }
            maybeNextItem = None
            result
          }

          maybeNextItemToSignal foreach signalToSubscriber
        }
      }
    }

    o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
      next ⇒ queueItem(OnNext(next)),
      error ⇒ queueItem(OnError(error)),
      () ⇒ queueItem(OnCompleted)
    )
  })
}

这似乎可行,但我不确定是否没有竞争条件或死锁。另外,我不确定解决方案是否可以变得更简单。我也一直在考虑另一种方法,比如

我也不知道如何为此编写确定性单元测试。 scheduleRec 调度的工作在与TestScheduler 一起使用时不能被打断,我需要使用一个真正在不同线程上工作的调度器。我发现很难为多线程代码的竞争条件编写正确的单元测试。

所以,问题仍然存在:我的解决方案是否正确?有没有更简单、更好或更正确的方法呢?以及如何测试它的正确性?

我建议使用 lift 来实现这个运算符。这是我的解决方案:

package object ObservableEx {

  implicit class ObserveLatestOn[T](val o: Observable[T]) {

    def observeLatestOn(scheduler: Scheduler): Observable[T] = {
      o.lift { (child: Subscriber[T]) =>
        val worker = scheduler.createWorker
        child.add(worker)

        val parent = new Subscriber[T] {

          private val lock = new AnyRef

          // protected by "lock"
          private var latest: Notification[T] = null
          // protected by "lock"
          // Means no task runs in the worker
          private var idle = true

          private var done = false

          override def onStart(): Unit = {
            request(Long.MaxValue)
          }

          override def onNext(v: T): Unit = {
            if (!done) {
              emit(OnNext(v))
            }
          }

          override def onCompleted(): Unit = {
            if (!done) {
              done = true
              emit(OnCompleted)
            }
          }

          override def onError(e: Throwable): Unit = {
            if (!done) {
              done = true
              emit(OnError(e))
            }
          }

          def emit(v: Notification[T]): Unit = {
            var shouldSchedule = false
            lock.synchronized {
              latest = v
              if (idle) {
                // worker is idle so we should schedule a task
                shouldSchedule = true
                // We will schedule a task, so the worker will be busy
                idle = false
              }
            }
            if (shouldSchedule) {
              worker.schedule {
                var n: Notification[T] = null
                var exit = false
                while (!exit) {
                  lock.synchronized {
                    if (latest == null) {
                      // No new item arrives and we are leaving the worker, so set "idle"
                      idle = true
                      exit = true
                    } else {
                      n = latest
                      latest = null
                    }
                  }
                  if (!exit) {
                    n.accept(child)
                  }
                }
              }
            }
          }
        }

        child.add(parent)

        parent
      }
    }
  }

}

还有一个单元测试

import ObservableEx.ObserveLatestOn

@Test
def testObserveLatestOn(): Unit = {
  val scheduler = TestScheduler()
  val xs = mutable.ArrayBuffer[Long]()
  var completed = false
  Observable.interval(100 milliseconds, scheduler).take(10).observeLatestOn(scheduler).subscribe(v => {
    scheduler.advanceTimeBy(200 milliseconds)
    xs += v
  },
    e => e.printStackTrace(),
    () => completed = true
  )
  scheduler.advanceTimeBy(100 milliseconds)
  assert(completed === true)
  assert(xs === List(0, 2, 4, 6, 8))
}

我有一个 PR,其中运算符 onBackpressureLatest() 应该具有预期的行为,但您需要并发性并且可以像往常一样使用 observeOn