理解 monix 消费者负载均衡
Understanding monix consumer load balance
我正在学习monix 3
。
下一个代码:
object Main extends TaskApp {
override def runc = {
Observable.fromIterable(1 to 10)
.map{i =>
val delay = Random.nextInt(1000) + 1000
println(s"Starting $i, delay = $delay")
Thread.sleep(delay) // Imitation of hard execution
i
}
.map{i =>
val delay = Random.nextInt(1000) + 1000
println(s"Continue $i, delay = $delay")
Thread.sleep(delay)
i
}
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i")))) //Compile error here
}
}
导致编译错误:
missing parameter type
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))
我想不通,这里出了什么问题,如何编译这段代码?
UPD
第二个问题是如何每 n
分钟重复一次这个流?
作为第一个问题的答案,您必须明确地为 foreach
提供一个类型参数:
Consumer.foreach[Int](i => println(s"End $i"))
要回答您的第二个问题,请使用 Observable.intervalAtFixedRate
或 Observable.intervalAtFixedDelay
。
请参考Monix Scaladoc。
希望对您有所帮助。
我正在学习monix 3
。
下一个代码:
object Main extends TaskApp {
override def runc = {
Observable.fromIterable(1 to 10)
.map{i =>
val delay = Random.nextInt(1000) + 1000
println(s"Starting $i, delay = $delay")
Thread.sleep(delay) // Imitation of hard execution
i
}
.map{i =>
val delay = Random.nextInt(1000) + 1000
println(s"Continue $i, delay = $delay")
Thread.sleep(delay)
i
}
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i")))) //Compile error here
}
}
导致编译错误:
missing parameter type
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))
我想不通,这里出了什么问题,如何编译这段代码?
UPD
第二个问题是如何每 n
分钟重复一次这个流?
作为第一个问题的答案,您必须明确地为 foreach
提供一个类型参数:
Consumer.foreach[Int](i => println(s"End $i"))
要回答您的第二个问题,请使用 Observable.intervalAtFixedRate
或 Observable.intervalAtFixedDelay
。
请参考Monix Scaladoc。
希望对您有所帮助。