具有一系列依赖的未来和非未来方法调用的 Scala 异常处理

scala exception handling with series of dependent future and non-future method calls

我有一个方法可能 return Future - 成功或失败甚至可以抛出异常。我可以通过将 try catch 块放在整个方法和 return Future 上来避免这种情况,但我现在想避免它。我在调用这种方法时遇到的问题很少:

1) 在调用者代码中,如果我使用 map 我期望执行一个方法并期望一个 Future 或一个我试图以下列方式处理的异常:

object ETLCoordinator {      

  private def getBusinessListFromModules(modulePaths: Iterable[File]) : Future[String] = {
    implicit val ec = ExecutionContext.global
    println("Inside getBusinessListFromModules..")
    throw new java.lang.RuntimeException("failed to get businesses") //Exception is thrown before future was constructed
    Future("ok")
  }

  def main(args: Array[String]) {

    println("Inside Future Test..")
    implicit val ec = ExecutionContext.global
    val modulePaths = Iterable(new File("mdrqaint/MDR/QAINT/QAINTX"))

    val fut1 = getBusinessListFromModules(modulePaths) //This is outside of try and which should be okay
    try { 
      fut1.map { res =>
        println("things after Successful fut1")
      }.recover{
        case t: Throwable => println("Failed future in fut1: "+ t.getMessage)
      }  
    } catch {
      case t: Throwable => println("Exception in fut1: "+ t.getMessage)
    }
  }    
}

输出:(没有执行上面的 recover 或 catch 块)

Inside Future Test..
Inside getBusinessListFromModules..
Exception in thread "main" java.lang.RuntimeException: failed to get businesses

但是如果我把 val fut1 = getBusinessListFromModules(modulePaths) 放在 Try 块中,那么异常就会被捕获到 Catch 块中,我得到输出:

Inside Future Test..
Inside getBusinessListFromModules..
Exception in fut1: failed to get businesses

这是为什么?我虽然 Future 执行发生在调用它的一些方法时,如 map、flatmap、onSuccess、onComplete 等。在这种情况下,对 map 的调用已经在 Try 块中。

2) 定义和调用此类方法的更好方法是什么? Try/catch 在调用者中阻塞还是在方法本身中 try/catch?或任何其他方式。我尝试将调用方法包装在 Future 中,以便在调用者中获得 Future[Future[String]] 。我能够避免所有的 try-catch。

val fut1 = Future(getBusinessListFromModules(modulePaths))
//try {
  fut1.map { res =>
    res.map{ str =>
      println("things after Successful fut1")  
    }.recover{
      case t: Throwable => println("Failed in future of fut1: "+ t.getMessage)
    } 
    println("things after Successful fut1 wrapper")
  }.recover{
    case t: Throwable => println("Failed to create future in fut1: "+ t.getMessage)
  } 

3) 如果中间有另一种方法委托给 getBusinessListFromModules 但它本身是非未来方法。

object ETLController {

  private def getBusinessListFromModules(modulePaths: Iterable[File]) : Future[String] = {
    implicit val ec = ExecutionContext.global
    println("Inside getBusinessListFromModules..")
    //throw new java.lang.RuntimeException("failed to get businesses")
    Future("ok")
  }

  private def callGetBusList(modulePaths: Iterable[File]) : String = {

    implicit val ec = ExecutionContext.global
    val etlF = getBusinessListFromModules(modulePaths)    

    etlF onComplete { 
      case Success(itr) => {
        println("Future getBusinessListFromModules success: "+ itr)
        throw new java.lang.RuntimeException("RTE from callGetBusList")
      }
      case Failure(t) => {
        println("Future getBusinessListFromModules throws an error")
      }
    }

    "callGetBusList was a success"
  }

  def main(args: Array[String]) {

    println("Inside Future Test..")
    implicit val ec = ExecutionContext.global
    val modulePaths = Iterable(new File("mdrqaint/MDR/QAINT/QAINTX"))
    try {
      val fut = Future(callGetBusList(modulePaths))
      fut.map { res =>
        println("successful future!")
      }.recover{
        case t: Throwable => println("Failed future: "+ t.getMessage)
      }
    } catch {
      case t: Throwable =>   println("callGetBusList failed:" + t.getMessage)
    }

  }    
}

输出:(没有恢复或捕获块执行!)

Inside Future Test..
Inside getBusinessListFromModules..
Future getBusinessListFromModules success: ok
java.lang.RuntimeException: RTE from callGetBusList
    at ..
successful future!

我什至尝试双重包装 Future 调用 :

val fut = Future(Future(callGetBusList(modulePaths)))
fut.map { res =>
  res.map { str =>
    println("successful inner future! "+ str)
  }.recover{
    case t: Throwable => println("Failed inner future: "+ t.getMessage)
  }
  println("successful outer future!")
}.recover{
  case t: Throwable => println("Failed outer future: "+ t.getMessage)
}

输出:

Future getBusinessListFromModules success: ok
java.lang.RuntimeException: RTE from callGetBusList
    at 
successful inner future! callGetBusList was a success
successful outer future!

我得到 "callGetBusList was a success" 似乎 RuntimeExceptiononComplete 方法中丢失了!我如何在最终调用者中捕获它?处理此类未来依赖关系的更好做法是什么?

更新: 基于 @dk14 的解释,选择将中间方法转换为 return Future,基本上所有方法都转换为 return 某种 Future 而不是普通的异常。

object ETLController {

  private def getBusinessListFromModules(modulePaths: Iterable[File]) : Future[String] = {
    implicit val ec = ExecutionContext.global
    println("Inside getBusinessListFromModules..")
    Future {
      Thread.sleep(2000)
      throw new java.lang.RuntimeException("failed to get businesses")
      "ok"
    }
  }

  private def callGetBusList(modulePaths: Iterable[File]) : Future[String] = {

    implicit val ec = ExecutionContext.global
    val etlF = getBusinessListFromModules(modulePaths)    

    etlF map { itr => 
        println("Future getBusinessListFromModules success: "+ itr)
        throw new java.lang.RuntimeException("RTE from callGetBusList")
      } recover {
      case t: Throwable => {
        println("Future callGetBusList throws an error: " + t.getMessage)
        throw t
      }
    }    
  }

  def main(args: Array[String]) {

    println("Inside Future Test..")
    implicit val ec = ExecutionContext.global
    val modulePaths = Iterable(new File("mdrqaint/MDR/QAINT/QAINTX"))
    val fut = callGetBusList(modulePaths)
    fut.map { str =>
        println("successful  future! "+ str)
    }.recover{
      case t: Throwable => println("Failed  future: "+ t.getMessage)
    }
    println("Active threads: " +Thread.activeCount())
    sys.allThreads().foreach(t => t.join())

  }    
}

1) 期货正在急切地开火,它们 。 参考问题的答案也包含一些关于 Future 内部行为 的见解,所以我想在这里跳过它。

为了以更可预测的方式管理有关执行 pools/queues/threads 的 side-effects,您可以考虑 scalaz/monix/fs2 Task or iteratee/scalaz/cats Eval (more abstract lazy evaluation, and intended for sync stuff) + Cont(延续是对订阅的抽象)作为替代方案。所有都是引用透明的并且延迟执行 "on-demand".

2) 最好的方法是您不喜欢的方法:不要在 Future 上下文之外抛出异常。

您也可以考虑 flatMap 以避免 Future[Future[T]]

3) 直接双重包装 Futures a-la Future(Future(...)) 不会改变任何东西。你的方法在 val etlF = g... 上执行(在同一个线程中),不管它是什么 return。 Future("ok") 的内容 (lambda) 在另一个线程上急切地执行(具有 "small" 不可预测的延迟),但 [执行任务正在提交到池中] 仍在 getBusinessListFromModules 中。

一个变通方法(不真正推荐)是 val etlF = Future(getBusinessListFromModules(...)).flatMap(identity) 它将 return 你将来包装任何直接来自 getBusinessListFromModules 和间接来自 getBusinessListFromModules 内部的异常Future

最好重构 getBusinessListFromModules 本身,但是,针对您的方法可能遇到的不同类型的问题(验证、同步与异步等)引入不同的异常类型。

P.S。有多种方法可以混合异步和同步异常处理,但实际上很难分析和预测这种混合行为(您可能已经注意到了)。代码变得丑陋。