理解 monix 消费者负载均衡

Understanding monix consumer load balance

我正在学习monix 3
下一个代码:

object Main extends TaskApp {
  override def runc = {
    Observable.fromIterable(1 to 10)
      .map{i =>
        val delay = Random.nextInt(1000) + 1000
        println(s"Starting $i, delay = $delay")
        Thread.sleep(delay)     // Imitation of hard execution
        i
      }
      .map{i =>
        val delay = Random.nextInt(1000) + 1000
        println(s"Continue $i, delay = $delay")
        Thread.sleep(delay)
        i
      }
      .consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))   //Compile error here
  }
}

导致编译错误:

missing parameter type
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))

我想不通,这里出了什么问题,如何编译这段代码?

UPD
第二个问题是如何每 n 分钟重复一次这个流?

作为第一个问题的答案,您必须明确地为 foreach 提供一个类型参数:

Consumer.foreach[Int](i => println(s"End $i"))

要回答您的第二个问题,请使用 Observable.intervalAtFixedRateObservable.intervalAtFixedDelay

请参考Monix Scaladoc

希望对您有所帮助。