未调用未来的 onComplete 回调

Future onComplete callback is not called

我正在尝试按如下方式记录每个任务的运行时间。

我遇到的问题是 logElapsedTime 中的回调方法由于某种原因从未被调用。

只调用 Future f 的最后一个回调。

如何解决此问题以便正确记录每个经过的时间?

    def logElapsedTime[T](f: Future[T], description: String): Future[T] = {
      val start = System.currentTimeMillis()
      f onComplete (_ => logger.debug(s"$description took [${System.currentTimeMillis() - start}]"))
      f
    }

    val f = for {
      _ <- logElapsedTime(task1(), "1st task to be executed")
      result <- logElapsedTime(task2(), "2nd task to be executed")
      _ <- logElapsedTime(task3(), "3rd task to be executed")
      _ <- logElapsedTime(task4(), "4th task to be executed")
    } yield result

    f onComplete {
      case Success(v) =>
        logger.info(s"tasks succeeded !!!! $v")
      case Failure(ex) =>
        logger.error(ex.getMessage)
        throw ex
    }   

输出样本↓

成功时间:

tasks succeeded !!!! some value

失败时:

some error message

没有记录其他输入。
(日志级别设置为调试及以上)

你的逻辑没有问题。我建议您尝试一些修改。

import org.slf4j.LoggerFactory

import scala.concurrent.Future
import concurrent.ExecutionContext.Implicits.global
import scala.io.StdIn
import scala.util.{Failure, Success}

object FutureOnComplete extends App {

  private val logger = LoggerFactory.getLogger("test")

  def logElapsedTime[T](f: => Future[T], description: String): Future[T] = {
    val start = System.currentTimeMillis()
    f.onComplete(
      _ =>
        logger.warn(
          s"$description took [${System.currentTimeMillis() - start}]"))
    f
  }

  val f = for {
    _ <- logElapsedTime(Future(1), "1st task to be executed")
    result <- logElapsedTime(Future(2), "2nd task to be executed")
    _ <- logElapsedTime(Future(2), "3rd task to be executed")
    _ <- logElapsedTime(Future(2), "4th task to be executed")
  } yield result

  f.onComplete {
    case Success(v) =>
      logger.info(s"tasks succeeded !!!! $v")
    case Failure(ex) =>
      logger.error(ex.getMessage)
      throw ex
  }

  StdIn.readLine()

}
  • 将日志级别提高到 warn 以确保您的日志记录不会受到指责。或者用 println
  • 代替
  • 例如,在主线程中等待将来完成 StdIn.readLine()。这允许异步进程完成并 onComplete 到 运行.
  • 通过名称参数使用=> Future[T] 在方法logElapsedTime 中开始执行future。这只会在未来开始时改变,但不会改变日志记录的逻辑

考虑 andThen,当我们只想执行日志记录作为副作用而不转换 Future 中的值时,例如

object futureAndThenLogging extends App with LazyLogging {

  def logElapsedTime[T](f: Future[T], description: String): Future[T] = {
    val start = System.currentTimeMillis()
    f andThen { case _ => logger.debug(s"$description took [${System.currentTimeMillis() - start}]") }
  }

  def task1() = Future(1)
  def task2() = Future(2)
  def task3() = Future(3)
  def task4() = Future(4)

  (for {
    _ <- logElapsedTime(task1(), "1st task to be executed")
    result <- logElapsedTime(task2(), "2nd task to be executed")
    _ <- logElapsedTime(task3(), "3rd task to be executed")
    _ <- logElapsedTime(task4(), "4th task to be executed")
  } yield result)
    .andThen {
      case Success(v) => logger.info(s"tasks succeeded !!!! $v")
      case Failure(ex) => logger.error(ex.getMessage)
    }

  Thread.sleep(1000) // just for demonstration purposes
}

请注意我们如何不必在 case Failure(ex) => logger.error(ex.getMessage) 中重新 throw ex