如何使 monix 固定速率调度程序在失败时继续

How to make monix fixed rate Scheduler continue upon failure

我刚开始使用 monix,部分原因是为了在较长的 运行 应用程序中安排重复工作。我将管理异常,但我希望 monix 继续调用给定函数,即使我让其中一些异常通过。

现在简单测试一下,一旦调度了重复调用,一旦出现异常就不会继续调用了:

// will print "Hi" repeatedly
scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
  println("Hi")
}

// will print "Hi" only once
scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
  println("Hi")
  throw new RuntimeException("oups, forgot to catch that one")
}

注意:我创建了调度程序来记录异常和错误

编辑:

我意识到在失败时简单地重复该任务是一个糟糕的设计。相反,我实际上应该设置一个适当的异常管理系统,延迟重启。

现在,我在 Monix 中看不到任何功能可以做到这一点。所以我必须自己做。如果有人有同样的问题,或者有人知道有用的 monix 工具,我会提出这个问题。

您始终可以利用 scala.util.Try 来实现那个或简单的 try-catch 块。在任何失败情况下,您只需登录并继续。您甚至可以采用如下所示的失败重试策略。

import scala.util._

def taskExceptionProne() = ???

var failures = 0
val maxRetries = 10

scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
    Try(taskExceptionProne) match {
        Success(result) =>
            //do something with the result
            failures = 0
            println("Task executed.")
        Failure(throwable) =>
            if (failures>=maxRetries) throw throwable else {
                failures = failures + 1
                //log failure
                println(throwable.getMessage)
            }
    }
}

另一种方法是使用 Observable,这样更容易编写。它还具有许多内置功能,因此您无需手动操作自己的功能。

val myTask: Task[Unit] = Task.delay(println("Execute Task")

// Note that the period will tick after you've completed the task
// So, if you have a long running task, just make sure you are aware of that
Observable
  .intervalAtFixedRate(1.seconds, 1.seconds)
  .mapEval(_ => myTask.onErrorRestart(maxRetries = 5))
  .completedL
  .startAndForget // Optional, will run in a separate fiber, so it doesn't block main thread
  .runToFuture