如何将 `Seq[Future[_]]` 流式传输到 `Future[Stream[_]]` 或 `Stream[_]` 以便它可以按顺序使用?
How to stream a `Seq[Future[_]]` into either a `Future[Stream[_]]` or a `Stream[_]` such that it can consumed as it becomes available in order?
作为第一次尝试,我尝试在 Seq
的头部使用 Await.result
,然后使用惰性 #::
Stream
构造函数。但是,它似乎没有预期的那么好,因为我还没有找到一种方法来告诉调度程序对列表的顺序进行优先排序,编译器也没有将其识别为 @tailrec
.
implicit class SeqOfFuture[X](seq: Seq[Future[X]]) {
lazy val stream: Stream[X] =
if (seq.nonEmpty) Await.result(seq.head) #:: seq.tail.stream
else Stream.empty
}
我正在尝试这样做,因为 Future.collect
似乎要等到整个 strict Seq
是 available/ready 才能 map/flatmap/transform 更进一步。 (还有其他计算,我可能会从中间结果流开始)
(Proto)使用示例:
val searches = [SearchParam1, SearchParam2..., SearchParam200]
// big queries that take a some 100ms each for ~20s total wait
val futureDbResult = searches.map(search => (quill)ctx.run { query(search) }).stream
// Stuff that should happen as results become available instead of blocking/waiting ~20 seconds before starting
val processedResults = futureDbResult.map(transform).filter(reduce)
// Log?
processedResults.map(result => log.info/log.trace)
//return lazy processedResults list or Future {processedResults}
???
正如其他人所指出的,您真的应该研究像 fs2
或 monix
这样的真正的流媒体库。我个人认为,如果您与 Future
交互并且只在您的应用程序的一小部分中需要它,那么 monix 是一个不错的选择。它为这个用例提供了很好的 API 和文档。
这是您的用例的小演示:
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import scala.concurrent.duration._
import scala.util.Random
// requires: libraryDependencies += "io.monix" %% "monix" % "3.0.0"
object Main {
val searchParams = (1 to 200).map(n => s"Search $n")
/**
* Simulates a query. If your library returns a Future, you can wrap it with `Task.deferFuture`
*/
def search(param: String): Task[String] =
Task(s"Result for $param").delayResult(Random.between(25, 250).milliseconds)
val results: Task[List[String]] =
Observable
.fromIterable(searchParams)
.mapParallelUnordered(parallelism = 4)(param => search(param))
.mapEval { result =>
Task(println(result)).map(_ => result) // print intermediate results as feedback
}
.toListL // collect results into List
/**
* If you aren't going all-in on monix, you probably run the stream into a Future with `results.runToFuture`
*/
def main(args: Array[String]): Unit = results.map(_ => ()).runSyncUnsafe()
}
你可以把Task
想象成一个懒惰的,更强大的Future
。 Observable
是一个(反应性)流,如果下游速度慢,它将自动背压。在此示例中,只有 4 个查询将 运行 并行,另一个将等待 "slot" 可用于 运行。
请记住,在这些库中,副作用(例如 println
必须包含在 Task
中(或 IO
取决于您使用的内容)。
如果您提供 monix 依赖项并试用它以了解其工作原理,则可以在本地 运行 此示例。
作为第一次尝试,我尝试在 Seq
的头部使用 Await.result
,然后使用惰性 #::
Stream
构造函数。但是,它似乎没有预期的那么好,因为我还没有找到一种方法来告诉调度程序对列表的顺序进行优先排序,编译器也没有将其识别为 @tailrec
.
implicit class SeqOfFuture[X](seq: Seq[Future[X]]) {
lazy val stream: Stream[X] =
if (seq.nonEmpty) Await.result(seq.head) #:: seq.tail.stream
else Stream.empty
}
我正在尝试这样做,因为 Future.collect
似乎要等到整个 strict Seq
是 available/ready 才能 map/flatmap/transform 更进一步。 (还有其他计算,我可能会从中间结果流开始)
(Proto)使用示例:
val searches = [SearchParam1, SearchParam2..., SearchParam200]
// big queries that take a some 100ms each for ~20s total wait
val futureDbResult = searches.map(search => (quill)ctx.run { query(search) }).stream
// Stuff that should happen as results become available instead of blocking/waiting ~20 seconds before starting
val processedResults = futureDbResult.map(transform).filter(reduce)
// Log?
processedResults.map(result => log.info/log.trace)
//return lazy processedResults list or Future {processedResults}
???
正如其他人所指出的,您真的应该研究像 fs2
或 monix
这样的真正的流媒体库。我个人认为,如果您与 Future
交互并且只在您的应用程序的一小部分中需要它,那么 monix 是一个不错的选择。它为这个用例提供了很好的 API 和文档。
这是您的用例的小演示:
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import scala.concurrent.duration._
import scala.util.Random
// requires: libraryDependencies += "io.monix" %% "monix" % "3.0.0"
object Main {
val searchParams = (1 to 200).map(n => s"Search $n")
/**
* Simulates a query. If your library returns a Future, you can wrap it with `Task.deferFuture`
*/
def search(param: String): Task[String] =
Task(s"Result for $param").delayResult(Random.between(25, 250).milliseconds)
val results: Task[List[String]] =
Observable
.fromIterable(searchParams)
.mapParallelUnordered(parallelism = 4)(param => search(param))
.mapEval { result =>
Task(println(result)).map(_ => result) // print intermediate results as feedback
}
.toListL // collect results into List
/**
* If you aren't going all-in on monix, you probably run the stream into a Future with `results.runToFuture`
*/
def main(args: Array[String]): Unit = results.map(_ => ()).runSyncUnsafe()
}
你可以把Task
想象成一个懒惰的,更强大的Future
。 Observable
是一个(反应性)流,如果下游速度慢,它将自动背压。在此示例中,只有 4 个查询将 运行 并行,另一个将等待 "slot" 可用于 运行。
请记住,在这些库中,副作用(例如 println
必须包含在 Task
中(或 IO
取决于您使用的内容)。
如果您提供 monix 依赖项并试用它以了解其工作原理,则可以在本地 运行 此示例。