scala Future 到 运行 个顺序作业
scala Future to run sequential jobs
我正在尝试按顺序启动三个作业,但是当我尝试此代码时:
val jobs = Seq("stream.Job1","stream.Job2","stream.Job3")
Future.sequence {
jobs.map { jobClass =>
Future {
println(s"Starting the spark job from class $jobClass...")
% gcloud("sparkC", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$JarFile")
println(s"Starting the spark job from class $jobClass...DONE")
}
}
}
我并行处理三个作业,然后顺序处理。
我认为解决方案是使用 flatMap
但我无法实现它。
请帮忙。
试试这个
val jobs = Seq("stream.Job1","stream.Job2","stream.Job3")
jobs.foldLeft(Future.successful[Unit]()) {
case (result, jobClass) =>
result.flatMap[Unit] {_ =>
Future {
println(s"Starting the spark job from class $jobClass...")
% gcloud("sparkC", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$JarFile")
println(s"Starting the spark job from class $jobClass...DONE")
}
}.
recoverWith {
case NonFatal(e) => result
}
}
这将迭代您的作业,运行 下一个未来会在上一个完成后立即迭代。我添加了 recoverWith
块来独立处理所有 Futures
如果其中任何一个失败
如果作业不相互依赖,并且你想要一个结果列表
最后,你可以使用这个:
import scala.concurrent._
def runIndependentSequentially[X]
(futs: List[() => Future[X]])
(implicit ec: ExecutionContext): Future[List[X]] = futs match {
case Nil => Future { Nil }
case h :: t => for {
x <- h()
xs <- runIndependentSequentially(t)
} yield x :: xs
}
现在您可以在您的未来工作列表中使用它,如下所示:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
val jobs = List("stream.Job1","stream.Job2","stream.Job3")
val futFactories = jobs.map { jobClass =>
() => Future {
println(s"Starting the spark job from class $jobClass...")
Thread.sleep(5000)
"result[" + jobClass + "," + (System.currentTimeMillis / 1000) % 3600 + "]"
}
}
println(Await.result(runIndependentSequentially(futFactories), 30 seconds))
这会产生以下输出:
Starting the spark job from class stream.Job1...
Starting the spark job from class stream.Job2...
Starting the spark job from class stream.Job3...
List(result[stream.Job1,3011], result[stream.Job2,3016], result[stream.Job3,3021])
UPDATE:将期货列表替换为 List[() => Future[X]]
,以便
即使在参数传递给之前,期货的评估也不会开始
runIndependentSequentially
方法。非常感谢@Evgeny 指出来!
我正在尝试按顺序启动三个作业,但是当我尝试此代码时:
val jobs = Seq("stream.Job1","stream.Job2","stream.Job3")
Future.sequence {
jobs.map { jobClass =>
Future {
println(s"Starting the spark job from class $jobClass...")
% gcloud("sparkC", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$JarFile")
println(s"Starting the spark job from class $jobClass...DONE")
}
}
}
我并行处理三个作业,然后顺序处理。
我认为解决方案是使用 flatMap
但我无法实现它。
请帮忙。
试试这个
val jobs = Seq("stream.Job1","stream.Job2","stream.Job3")
jobs.foldLeft(Future.successful[Unit]()) {
case (result, jobClass) =>
result.flatMap[Unit] {_ =>
Future {
println(s"Starting the spark job from class $jobClass...")
% gcloud("sparkC", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$JarFile")
println(s"Starting the spark job from class $jobClass...DONE")
}
}.
recoverWith {
case NonFatal(e) => result
}
}
这将迭代您的作业,运行 下一个未来会在上一个完成后立即迭代。我添加了 recoverWith
块来独立处理所有 Futures
如果其中任何一个失败
如果作业不相互依赖,并且你想要一个结果列表 最后,你可以使用这个:
import scala.concurrent._
def runIndependentSequentially[X]
(futs: List[() => Future[X]])
(implicit ec: ExecutionContext): Future[List[X]] = futs match {
case Nil => Future { Nil }
case h :: t => for {
x <- h()
xs <- runIndependentSequentially(t)
} yield x :: xs
}
现在您可以在您的未来工作列表中使用它,如下所示:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
val jobs = List("stream.Job1","stream.Job2","stream.Job3")
val futFactories = jobs.map { jobClass =>
() => Future {
println(s"Starting the spark job from class $jobClass...")
Thread.sleep(5000)
"result[" + jobClass + "," + (System.currentTimeMillis / 1000) % 3600 + "]"
}
}
println(Await.result(runIndependentSequentially(futFactories), 30 seconds))
这会产生以下输出:
Starting the spark job from class stream.Job1...
Starting the spark job from class stream.Job2...
Starting the spark job from class stream.Job3...
List(result[stream.Job1,3011], result[stream.Job2,3016], result[stream.Job3,3021])
UPDATE:将期货列表替换为 List[() => Future[X]]
,以便
即使在参数传递给之前,期货的评估也不会开始
runIndependentSequentially
方法。非常感谢@Evgeny 指出来!