Future.traverse是否保证执行顺序
Does Future.traverse ensure the order of execution
我用的是Future.traverse,执行顺序有保证。我的函数 fn
必须在运行下一个元素之前调用并完成 future。
val xs = Seq[T] ???
def fn(t: T): Future[Unit] = ???
Future.traverse(xs)(fn)
谢谢,
我觉得不像
Asynchronously and non-blockingly transforms a TraversableOnce[A] into a Future[TraversableOnce[B]] using the provided function A => Future[B]. This is useful for performing a parallel map.
文档中没有具体提及,这意味着如果存在更高效的方法,合同可能会发生变化。它还提到 "parallel map" 所以这是另一个暗示它不太可能保留执行顺序。
在 Scala 2.11 中实现 traverse
:
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(successful(cbf(in))) { (fr, a) =>
val fb = fn(a)
for (r <- fr; b <- fb) yield (r += b)
}.map(_.result())
val fb = fn(a)
创建 Future[B]
,然后才与先前创建的未来 for (r <- fr; b <- fb) yield (r += b)
组合。所以答案是否定的。没有执行顺序保证。
在 scala 2.12 中实现改变了:
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(successful(cbf(in))) {
(fr, a) => fr.zipWith(fn(a))(_ += _)
}.map(_.result())(InternalCallbackExecutor)
但是 'next' future 是在(zipWith 的第一个参数是 'call by value')与之前的 fr
.
链接之前创建的
如果需要顺序遍历,只需对2.11的实现做一点改动:
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(successful(cbf(in))) { (fr, a) =>
for (r <- fr; b <- fn(a)) yield (r += b)
}.map(_.result())
正如其他答案已经指出的那样:不,traverse
不会(必然[1])对元素按顺序应用转换直至完成。
但是,您可以制作等同于 linearize
的内容
也许是这样的:
import scala.concurrent._
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import language.higherKinds
/**
* Linearize asynchronously applies a given function in-order to a sequence of values, producing a Future with the result of the function applications.
* Execution of subsequent entries will be aborted if an exception is thrown in the application of the function.
*/
def linearize[T, U, C[T] <: Traversable[T]](s: C[T])(f: T => Future[U])(implicit cbf: CanBuildFrom[C[T], U, C[U]], e: ExecutionContext): Future[C[U]] = {
def next(i: Iterator[T], b: Builder[U, C[U]]): Future[C[U]] =
if(!i.hasNext) Future.successful(b.result)
else Future.unit.flatMap(_ => f(i.next()).flatMap(v => next(i, b += v)))
next(s.toIterator, cbf(s))
}
1:您可以想象同步 EC 实现顺序效果。
我用的是Future.traverse,执行顺序有保证。我的函数 fn
必须在运行下一个元素之前调用并完成 future。
val xs = Seq[T] ???
def fn(t: T): Future[Unit] = ???
Future.traverse(xs)(fn)
谢谢,
我觉得不像
Asynchronously and non-blockingly transforms a TraversableOnce[A] into a Future[TraversableOnce[B]] using the provided function A => Future[B]. This is useful for performing a parallel map.
文档中没有具体提及,这意味着如果存在更高效的方法,合同可能会发生变化。它还提到 "parallel map" 所以这是另一个暗示它不太可能保留执行顺序。
在 Scala 2.11 中实现 traverse
:
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(successful(cbf(in))) { (fr, a) =>
val fb = fn(a)
for (r <- fr; b <- fb) yield (r += b)
}.map(_.result())
val fb = fn(a)
创建 Future[B]
,然后才与先前创建的未来 for (r <- fr; b <- fb) yield (r += b)
组合。所以答案是否定的。没有执行顺序保证。
在 scala 2.12 中实现改变了:
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(successful(cbf(in))) {
(fr, a) => fr.zipWith(fn(a))(_ += _)
}.map(_.result())(InternalCallbackExecutor)
但是 'next' future 是在(zipWith 的第一个参数是 'call by value')与之前的 fr
.
如果需要顺序遍历,只需对2.11的实现做一点改动:
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(successful(cbf(in))) { (fr, a) =>
for (r <- fr; b <- fn(a)) yield (r += b)
}.map(_.result())
正如其他答案已经指出的那样:不,traverse
不会(必然[1])对元素按顺序应用转换直至完成。
但是,您可以制作等同于 linearize
的内容也许是这样的:
import scala.concurrent._
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import language.higherKinds
/**
* Linearize asynchronously applies a given function in-order to a sequence of values, producing a Future with the result of the function applications.
* Execution of subsequent entries will be aborted if an exception is thrown in the application of the function.
*/
def linearize[T, U, C[T] <: Traversable[T]](s: C[T])(f: T => Future[U])(implicit cbf: CanBuildFrom[C[T], U, C[U]], e: ExecutionContext): Future[C[U]] = {
def next(i: Iterator[T], b: Builder[U, C[U]]): Future[C[U]] =
if(!i.hasNext) Future.successful(b.result)
else Future.unit.flatMap(_ => f(i.next()).flatMap(v => next(i, b += v)))
next(s.toIterator, cbf(s))
}
1:您可以想象同步 EC 实现顺序效果。