不同频率的热可观察消费
Hot observable consume with different frequency
我正在为我要完成的任务编写一个简单示例。
假设有一个任务列表 (tasks
),我想 每 1 秒触发一次 。
这可以通过 scheduler
或其他方式来完成。
现在这个流有两个消费者,但是
C1
应该在完成所有任务时触发
C2
应该在每第 n 次完成所有任务时触发。 (也可以是每n秒)
这是一些示例代码。目前,它没有安排重复 - 因为我不知道使用 Observable.repeat
还是 Scheduler
.
更好
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.Scheduler.{global => scheduler}
import monix.reactive.{Consumer, Observable}
import scala.concurrent.duration._
object MainTest {
def main(args: Array[String]): Unit = {
def t = (i: Int) => Observable.eval {
print(i)
i
}
val tsks = (1 to 5).map(t)
val tasks = Observable.fromIterable(tsks).flatten.doOnCompleteEval(Task.eval(println("")))
val c1 = Consumer.foreach[Int](x => println(s"C1: [$x]"))
val c2 = Consumer.foreach[Int](x => println(s"C2: [$x]"))
val s = tasks.reduce(_ + _).publish
s.consumeWith(c1).runAsync
s.consumeWith(c2).runAsync
s.connect()
while (true) {
Thread.sleep(1.hour.toMillis)
}
}
}
首先,为了每1秒重复一个任务,你可以做...
Observable.intervalAtFixedRate(1.second)
.flatMap(_ => Observable.eval(???))
为了在所有任务完成时触发,您可以使用 completed
(如果您想要只发出最终完成事件的 Observable[Nothing]
)或 completedL
(如果您想要改为使用 Task[Unit]
)。
有关详细信息,请参阅 API docs。
因此,除了 c1
之外,您还可以这样做:
s.completeL.runAsync
然而,要对源进行采样,您可以使用:
sample
(别名throttleLast
)
sampleRepeated
throttleFirst
debounce
debounceRepeated
echo
echoRepeated
我鼓励你玩这些,从 API docs 开始。
s.sample(10.seconds).doOnNext(println).completedL.runAsync
或者你可以简单地用 takeEveryNth
:
取每第 N 个元素
s.takeEveryNth(20).doOnNext(println).completedL.runAsync
如果这回答了您的问题,请告诉我。
我正在为我要完成的任务编写一个简单示例。
假设有一个任务列表 (tasks
),我想 每 1 秒触发一次 。
这可以通过 scheduler
或其他方式来完成。
现在这个流有两个消费者,但是
C1
应该在完成所有任务时触发C2
应该在每第 n 次完成所有任务时触发。 (也可以是每n秒)
这是一些示例代码。目前,它没有安排重复 - 因为我不知道使用 Observable.repeat
还是 Scheduler
.
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.Scheduler.{global => scheduler}
import monix.reactive.{Consumer, Observable}
import scala.concurrent.duration._
object MainTest {
def main(args: Array[String]): Unit = {
def t = (i: Int) => Observable.eval {
print(i)
i
}
val tsks = (1 to 5).map(t)
val tasks = Observable.fromIterable(tsks).flatten.doOnCompleteEval(Task.eval(println("")))
val c1 = Consumer.foreach[Int](x => println(s"C1: [$x]"))
val c2 = Consumer.foreach[Int](x => println(s"C2: [$x]"))
val s = tasks.reduce(_ + _).publish
s.consumeWith(c1).runAsync
s.consumeWith(c2).runAsync
s.connect()
while (true) {
Thread.sleep(1.hour.toMillis)
}
}
}
首先,为了每1秒重复一个任务,你可以做...
Observable.intervalAtFixedRate(1.second)
.flatMap(_ => Observable.eval(???))
为了在所有任务完成时触发,您可以使用 completed
(如果您想要只发出最终完成事件的 Observable[Nothing]
)或 completedL
(如果您想要改为使用 Task[Unit]
)。
有关详细信息,请参阅 API docs。
因此,除了 c1
之外,您还可以这样做:
s.completeL.runAsync
然而,要对源进行采样,您可以使用:
sample
(别名throttleLast
)sampleRepeated
throttleFirst
debounce
debounceRepeated
echo
echoRepeated
我鼓励你玩这些,从 API docs 开始。
s.sample(10.seconds).doOnNext(println).completedL.runAsync
或者你可以简单地用 takeEveryNth
:
s.takeEveryNth(20).doOnNext(println).completedL.runAsync
如果这回答了您的问题,请告诉我。