如何使 monix 固定速率调度程序在失败时继续
How to make monix fixed rate Scheduler continue upon failure
我刚开始使用 monix,部分原因是为了在较长的 运行 应用程序中安排重复工作。我将管理异常,但我希望 monix 继续调用给定函数,即使我让其中一些异常通过。
现在简单测试一下,一旦调度了重复调用,一旦出现异常就不会继续调用了:
// will print "Hi" repeatedly
scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
println("Hi")
}
// will print "Hi" only once
scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
println("Hi")
throw new RuntimeException("oups, forgot to catch that one")
}
注意:我创建了调度程序来记录异常和错误
编辑:
我意识到在失败时简单地重复该任务是一个糟糕的设计。相反,我实际上应该设置一个适当的异常管理系统,延迟重启。
现在,我在 Monix 中看不到任何功能可以做到这一点。所以我必须自己做。如果有人有同样的问题,或者有人知道有用的 monix 工具,我会提出这个问题。
您始终可以利用 scala.util.Try 来实现那个或简单的 try-catch 块。在任何失败情况下,您只需登录并继续。您甚至可以采用如下所示的失败重试策略。
import scala.util._
def taskExceptionProne() = ???
var failures = 0
val maxRetries = 10
scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
Try(taskExceptionProne) match {
Success(result) =>
//do something with the result
failures = 0
println("Task executed.")
Failure(throwable) =>
if (failures>=maxRetries) throw throwable else {
failures = failures + 1
//log failure
println(throwable.getMessage)
}
}
}
另一种方法是使用 Observable
,这样更容易编写。它还具有许多内置功能,因此您无需手动操作自己的功能。
val myTask: Task[Unit] = Task.delay(println("Execute Task")
// Note that the period will tick after you've completed the task
// So, if you have a long running task, just make sure you are aware of that
Observable
.intervalAtFixedRate(1.seconds, 1.seconds)
.mapEval(_ => myTask.onErrorRestart(maxRetries = 5))
.completedL
.startAndForget // Optional, will run in a separate fiber, so it doesn't block main thread
.runToFuture
我刚开始使用 monix,部分原因是为了在较长的 运行 应用程序中安排重复工作。我将管理异常,但我希望 monix 继续调用给定函数,即使我让其中一些异常通过。
现在简单测试一下,一旦调度了重复调用,一旦出现异常就不会继续调用了:
// will print "Hi" repeatedly
scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
println("Hi")
}
// will print "Hi" only once
scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
println("Hi")
throw new RuntimeException("oups, forgot to catch that one")
}
注意:我创建了调度程序来记录异常和错误
编辑:
我意识到在失败时简单地重复该任务是一个糟糕的设计。相反,我实际上应该设置一个适当的异常管理系统,延迟重启。
现在,我在 Monix 中看不到任何功能可以做到这一点。所以我必须自己做。如果有人有同样的问题,或者有人知道有用的 monix 工具,我会提出这个问题。
您始终可以利用 scala.util.Try 来实现那个或简单的 try-catch 块。在任何失败情况下,您只需登录并继续。您甚至可以采用如下所示的失败重试策略。
import scala.util._
def taskExceptionProne() = ???
var failures = 0
val maxRetries = 10
scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
Try(taskExceptionProne) match {
Success(result) =>
//do something with the result
failures = 0
println("Task executed.")
Failure(throwable) =>
if (failures>=maxRetries) throw throwable else {
failures = failures + 1
//log failure
println(throwable.getMessage)
}
}
}
另一种方法是使用 Observable
,这样更容易编写。它还具有许多内置功能,因此您无需手动操作自己的功能。
val myTask: Task[Unit] = Task.delay(println("Execute Task")
// Note that the period will tick after you've completed the task
// So, if you have a long running task, just make sure you are aware of that
Observable
.intervalAtFixedRate(1.seconds, 1.seconds)
.mapEval(_ => myTask.onErrorRestart(maxRetries = 5))
.completedL
.startAndForget // Optional, will run in a separate fiber, so it doesn't block main thread
.runToFuture