如何防止 Future 永不结束
How to prevent Future from never finishing
假设我有多个任务要并行 运行。
每个任务(方法)都有一个内部递归函数,基本上从数据库中获取数据并将其保存在一些数据存储中。
[简化的内部递归函数]
def simplifiedSomeTask(): Unit = {
@scala.annotation.tailrec
def get(
stream: Stream[SomeEntity],
result: Seq[SomeEntity],
): Stream[SomeEntity] = result match {
case Nil =>
stream
case _ =>
val query = //query to fetch data from database
get(
stream append result.toStream,
query.run.value, // get fetched data from database
)
}
val buffer = collection.mutable.Map.empty[String, String]
get(
Stream.empty,
query.run.value
).foreach { r =>
buffer.put(r.loginId, r.userId)
}
}
尝试 运行 A 时,Future 出于某种原因从未完成。
[一个]
val f1 =Future { someTask1() }
val f2 =Future { someTask2() }
val f3 =Future { someTask3() }
val f = for {
_ <- f1
_ <- f2
_ <- f3
} yield ()
Await.result(f, Duration.Inf)
但是,B 可以工作(虽然它不能 运行 并行)
[乙]
val f = for {
_ <- Future { someTask1() }
_ <- Future { someTask2() }
_ <- Future { someTask3() }
} yield ()
Await.result(f, Duration.Inf)
我应该如何修改 A 以使其 运行 符合预期?
我无法重现您的问题,但奇怪行为的原因可能是您在第一个示例中的语法不完全正确。你应该把你的第一个 for-comprehension 写成这样:
val f = for {
_ <- f1
_ <- f2
_ <- f3
} yield ()
但是 for-comprehension 按顺序工作,在你的第一个例子中你的期货 运行 并行的唯一原因是期货急切地开始("Future starts now").
如果您想确保 Futures 将并行执行,请使用 Future.sequence
:
val f = Future.sequence(
List(
Future { someTask1() },
Future { someTask2() },
Future { someTask3() }
)
)
问题不在for-comprehension,而在你的任务。 运行 它们并行可能存在某种死锁,但我会首先进行三次检查,确保它们不会陷入无限循环。查看您的示例,如果 query.run.value
永远不会 returns 为空,则很容易发生这种情况,然后递归将永远持续下去。如果 f1
、f2
和 f3
的 any 没有解决,那么 f
当然也不会解决。
事实证明,在创建 query
对象时一些圆圈引用导致了这个问题。
假设我有多个任务要并行 运行。
每个任务(方法)都有一个内部递归函数,基本上从数据库中获取数据并将其保存在一些数据存储中。
[简化的内部递归函数]
def simplifiedSomeTask(): Unit = {
@scala.annotation.tailrec
def get(
stream: Stream[SomeEntity],
result: Seq[SomeEntity],
): Stream[SomeEntity] = result match {
case Nil =>
stream
case _ =>
val query = //query to fetch data from database
get(
stream append result.toStream,
query.run.value, // get fetched data from database
)
}
val buffer = collection.mutable.Map.empty[String, String]
get(
Stream.empty,
query.run.value
).foreach { r =>
buffer.put(r.loginId, r.userId)
}
}
尝试 运行 A 时,Future 出于某种原因从未完成。
[一个]
val f1 =Future { someTask1() }
val f2 =Future { someTask2() }
val f3 =Future { someTask3() }
val f = for {
_ <- f1
_ <- f2
_ <- f3
} yield ()
Await.result(f, Duration.Inf)
但是,B 可以工作(虽然它不能 运行 并行)
[乙]
val f = for {
_ <- Future { someTask1() }
_ <- Future { someTask2() }
_ <- Future { someTask3() }
} yield ()
Await.result(f, Duration.Inf)
我应该如何修改 A 以使其 运行 符合预期?
我无法重现您的问题,但奇怪行为的原因可能是您在第一个示例中的语法不完全正确。你应该把你的第一个 for-comprehension 写成这样:
val f = for {
_ <- f1
_ <- f2
_ <- f3
} yield ()
但是 for-comprehension 按顺序工作,在你的第一个例子中你的期货 运行 并行的唯一原因是期货急切地开始("Future starts now").
如果您想确保 Futures 将并行执行,请使用 Future.sequence
:
val f = Future.sequence(
List(
Future { someTask1() },
Future { someTask2() },
Future { someTask3() }
)
)
问题不在for-comprehension,而在你的任务。 运行 它们并行可能存在某种死锁,但我会首先进行三次检查,确保它们不会陷入无限循环。查看您的示例,如果 query.run.value
永远不会 returns 为空,则很容易发生这种情况,然后递归将永远持续下去。如果 f1
、f2
和 f3
的 any 没有解决,那么 f
当然也不会解决。
事实证明,在创建 query
对象时一些圆圈引用导致了这个问题。