不同频率的热可观察消费

Hot observable consume with different frequency

我正在为我要完成的任务编写一个简单示例。 假设有一个任务列表 (tasks),我想 每 1 秒触发一次

这可以通过 scheduler 或其他方式来完成。

现在这个流有两个消费者,但是

这是一些示例代码。目前,它没有安排重复 - 因为我不知道使用 Observable.repeat 还是 Scheduler.

更好
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.Scheduler.{global => scheduler}
import monix.reactive.{Consumer, Observable}

import scala.concurrent.duration._

object MainTest {

  def main(args: Array[String]): Unit = {

    def t = (i: Int) => Observable.eval {
      print(i)
      i
    }

    val tsks = (1 to 5).map(t)

    val tasks = Observable.fromIterable(tsks).flatten.doOnCompleteEval(Task.eval(println("")))

    val c1 = Consumer.foreach[Int](x => println(s"C1: [$x]"))
    val c2 = Consumer.foreach[Int](x => println(s"C2: [$x]"))

    val s = tasks.reduce(_ + _).publish

    s.consumeWith(c1).runAsync
    s.consumeWith(c2).runAsync
    s.connect()

    while (true) {
      Thread.sleep(1.hour.toMillis)
    }
  }

}

首先,为了每1秒重复一个任务,你可以做...

Observable.intervalAtFixedRate(1.second)
  .flatMap(_ => Observable.eval(???))

为了在所有任务完成时触发,您可以使用 completed(如果您想要只发出最终完成事件的 Observable[Nothing])或 completedL(如果您想要改为使用 Task[Unit])。 有关详细信息,请参阅 API docs

因此,除了 c1 之外,您还可以这样做:

s.completeL.runAsync

然而,要对源进行采样,您可以使用:

  • sample(别名throttleLast
  • sampleRepeated
  • throttleFirst
  • debounce
  • debounceRepeated
  • echo
  • echoRepeated

我鼓励你玩这些,从 API docs 开始。

s.sample(10.seconds).doOnNext(println).completedL.runAsync

或者你可以简单地用 takeEveryNth:

取每第 N 个元素
s.takeEveryNth(20).doOnNext(println).completedL.runAsync

如果这回答了您的问题,请告诉我。