处理对象序列时链式 Scala 期货?
Chain Scala Futures when processing a Seq of objects?
import scala.concurrent.duration.Duration
import scala.concurrent.duration.Duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.Future._
import scala.concurrent.ExecutionContext.Implicits.global
object TestClass {
final case class Record(id: String)
final case class RecordDetail(id: String)
final case class UploadResult(result: String)
val ids: Seq[String] = Seq("a", "b", "c", "d")
def fetch(id: String): Future[Option[Record]] = Future {
Thread sleep 100
if (id != "b" && id != "d") {
Some(Record(id))
} else None
}
def fetchRecordDetail(record: Record): Future[RecordDetail] = Future {
Thread sleep 100
RecordDetail(record.id + "_detail")
}
def upload(recordDetail: RecordDetail): Future[UploadResult] = Future {
Thread sleep 100
UploadResult(recordDetail.id + "_uploaded")
}
def notifyUploaded(results: Seq[UploadResult]): Unit = println("notified " + results)
def main(args: Array[String]): Unit = {
//for each id from ids, call fetch method and if record exists call fetchRecordDetail
//and after that upload RecordDetail, collect all UploadResults into seq
//and call notifyUploaded with that seq and await result, you should see "notified ...." in console
// In the following line of code how do I pass result of fetch to fetchRecordDetail function
val result = Future.traverse(ids)(x => Future(fetch(x)))
// val result: Future[Unit] = ???
Await.ready(result, Duration.Inf)
}
}
我的问题是我不知道要在 main
中放入什么代码才能使其像评论中所写的那样工作。综上所述,我有一个ids:Seq[String]
,我希望每个id都经过异步方法fetch
、fetchRecordDetail
、upload
,最后整个Seq
到来到notifyUploaded
.
我认为最简单的方法是:
def main(args: Array[String]): Unit = {
//for each id from ids, call fetch method and if record exists call fetchRecordDetail
//and after that upload RecordDetail, collect all UploadResults into seq
//and call notifyUploaded with that seq and await result, you should see "notified ...." in console
def runWithOption[A, B](f: A => Future[B], oa: Option[A]): Future[Option[B]] = oa match {
case Some(a) => f(a).map(b => Some(b))
case None => Future.successful(None)
}
val ids: Seq[String] = Seq("a", "b", "c", "d")
val resultSeq: Seq[Future[Option[UploadResult]]] = ids.map(id => {
for (or: Option[Record] <- fetch(id);
ord: Option[RecordDetail] <- runWithOption(fetchRecordDetail, or);
our: Option[UploadResult] <- runWithOption(upload, ord)
) yield our
})
val filteredResult: Future[Seq[UploadResult]] = Future.sequence(resultSeq).map(s => s.collect({ case Some(ur) => ur }))
val result: Future[Seq[UploadResult]] = filteredResult.andThen({ case Success(s) => notifyUploaded(s) })
Await.ready(result, Duration.Inf)
}
这个想法是你首先得到一个Seq[Future[_]]
,你通过所有的方法map
(这里是使用for-comprehension完成的)。这里有一个重要的技巧是实际传递Seq[Future[Option[_]]]
。通过 runWithOption
辅助方法在整个链中传递 Option[_]
可以大大简化代码,而无需阻塞到最后阶段。
然后将 Seq[Future[_]]
转换为 Future[Seq[_]]
并过滤掉那些 ids
在 fetch
阶段失败的结果。最后你申请 notifyUploaded
.
P.S。请注意,此代码中没有任何错误处理,并且不清楚您希望它在不同阶段出现错误时的行为方式。
import scala.concurrent.duration.Duration
import scala.concurrent.duration.Duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.Future._
import scala.concurrent.ExecutionContext.Implicits.global
object TestClass {
final case class Record(id: String)
final case class RecordDetail(id: String)
final case class UploadResult(result: String)
val ids: Seq[String] = Seq("a", "b", "c", "d")
def fetch(id: String): Future[Option[Record]] = Future {
Thread sleep 100
if (id != "b" && id != "d") {
Some(Record(id))
} else None
}
def fetchRecordDetail(record: Record): Future[RecordDetail] = Future {
Thread sleep 100
RecordDetail(record.id + "_detail")
}
def upload(recordDetail: RecordDetail): Future[UploadResult] = Future {
Thread sleep 100
UploadResult(recordDetail.id + "_uploaded")
}
def notifyUploaded(results: Seq[UploadResult]): Unit = println("notified " + results)
def main(args: Array[String]): Unit = {
//for each id from ids, call fetch method and if record exists call fetchRecordDetail
//and after that upload RecordDetail, collect all UploadResults into seq
//and call notifyUploaded with that seq and await result, you should see "notified ...." in console
// In the following line of code how do I pass result of fetch to fetchRecordDetail function
val result = Future.traverse(ids)(x => Future(fetch(x)))
// val result: Future[Unit] = ???
Await.ready(result, Duration.Inf)
}
}
我的问题是我不知道要在 main
中放入什么代码才能使其像评论中所写的那样工作。综上所述,我有一个ids:Seq[String]
,我希望每个id都经过异步方法fetch
、fetchRecordDetail
、upload
,最后整个Seq
到来到notifyUploaded
.
我认为最简单的方法是:
def main(args: Array[String]): Unit = {
//for each id from ids, call fetch method and if record exists call fetchRecordDetail
//and after that upload RecordDetail, collect all UploadResults into seq
//and call notifyUploaded with that seq and await result, you should see "notified ...." in console
def runWithOption[A, B](f: A => Future[B], oa: Option[A]): Future[Option[B]] = oa match {
case Some(a) => f(a).map(b => Some(b))
case None => Future.successful(None)
}
val ids: Seq[String] = Seq("a", "b", "c", "d")
val resultSeq: Seq[Future[Option[UploadResult]]] = ids.map(id => {
for (or: Option[Record] <- fetch(id);
ord: Option[RecordDetail] <- runWithOption(fetchRecordDetail, or);
our: Option[UploadResult] <- runWithOption(upload, ord)
) yield our
})
val filteredResult: Future[Seq[UploadResult]] = Future.sequence(resultSeq).map(s => s.collect({ case Some(ur) => ur }))
val result: Future[Seq[UploadResult]] = filteredResult.andThen({ case Success(s) => notifyUploaded(s) })
Await.ready(result, Duration.Inf)
}
这个想法是你首先得到一个Seq[Future[_]]
,你通过所有的方法map
(这里是使用for-comprehension完成的)。这里有一个重要的技巧是实际传递Seq[Future[Option[_]]]
。通过 runWithOption
辅助方法在整个链中传递 Option[_]
可以大大简化代码,而无需阻塞到最后阶段。
然后将 Seq[Future[_]]
转换为 Future[Seq[_]]
并过滤掉那些 ids
在 fetch
阶段失败的结果。最后你申请 notifyUploaded
.
P.S。请注意,此代码中没有任何错误处理,并且不清楚您希望它在不同阶段出现错误时的行为方式。