map( T => Future[U]) 和 flatMapConcat( T => Source.fromFuture(Future[U])) 之间的 Akka 流差异
Akka stream difference between map( T => Future[U]) and flatMapConcat( T => Source.fromFuture(Future[U]))
请问,定义 Sink[RandomCdr,Future[Done] 的这两种方法有什么区别
Flow[RandomCdr]
.grouped(bulkSize)
.flatMapConcat{ (bulk : Seq[RandomCdr]) =>
Source.fromFuture(collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec))
}
.toMat(Sink.ignore)(Keep.right)
Flow[RandomCdr]
.grouped(bulkSize)
.map((bulk : Seq[RandomCdr]) => collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec))
.toMat(Sink.ignore)(Keep.right)
函数 collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)
returns 一个 Future[T] 是 reactivemongo 驱动程序
第一个片段
此处每个传入的批量将被转换为一个 Future
,并且在您提供的执行上下文中所说的 Future
将是 运行。只有在这一点上,下一个批量将通过生成另一个 Future
来处理,依此类推。
基本上期货是运行的顺序。这在行为上类似于
Flow[RandomCdr]
.grouped(bulkSize)
.mapAsync(parallelism = 1){ (bulk : Seq[RandomCdr]) =>
collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)
}
.toMat(Sink.ignore)(Keep.right)
第二个片段
此处每个传入的批量都将转换为 Future
,这将在您提供的执行上下文中 运行。 Future
将立即传递给 Sink.ignore
并且其引用将被丢弃。
使用这种方法无法控制同时 Future
的数量 运行。因此,不推荐使用这种方法。
如果您正在寻找改进的并行度,请考虑使用如上所示的 mapAsync
,并调整并行度参数。
请问,定义 Sink[RandomCdr,Future[Done] 的这两种方法有什么区别
Flow[RandomCdr]
.grouped(bulkSize)
.flatMapConcat{ (bulk : Seq[RandomCdr]) =>
Source.fromFuture(collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec))
}
.toMat(Sink.ignore)(Keep.right)
Flow[RandomCdr]
.grouped(bulkSize)
.map((bulk : Seq[RandomCdr]) => collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec))
.toMat(Sink.ignore)(Keep.right)
函数 collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)
returns 一个 Future[T] 是 reactivemongo 驱动程序
第一个片段
此处每个传入的批量将被转换为一个 Future
,并且在您提供的执行上下文中所说的 Future
将是 运行。只有在这一点上,下一个批量将通过生成另一个 Future
来处理,依此类推。
基本上期货是运行的顺序。这在行为上类似于
Flow[RandomCdr]
.grouped(bulkSize)
.mapAsync(parallelism = 1){ (bulk : Seq[RandomCdr]) =>
collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)
}
.toMat(Sink.ignore)(Keep.right)
第二个片段
此处每个传入的批量都将转换为 Future
,这将在您提供的执行上下文中 运行。 Future
将立即传递给 Sink.ignore
并且其引用将被丢弃。
使用这种方法无法控制同时 Future
的数量 运行。因此,不推荐使用这种方法。
如果您正在寻找改进的并行度,请考虑使用如上所示的 mapAsync
,并调整并行度参数。