map( T => Future[U]) 和 flatMapConcat( T => Source.fromFuture(Future[U])) 之间的 Akka 流差异

Akka stream difference between map( T => Future[U]) and flatMapConcat( T => Source.fromFuture(Future[U]))

请问,定义 Sink[RandomCdr,Future[Done] 的这两种方法有什么区别

Flow[RandomCdr]
      .grouped(bulkSize)
      .flatMapConcat{ (bulk : Seq[RandomCdr]) =>
        Source.fromFuture(collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec))
      }
      .toMat(Sink.ignore)(Keep.right)


Flow[RandomCdr]
  .grouped(bulkSize)
  .map((bulk : Seq[RandomCdr]) => collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec))
  .toMat(Sink.ignore)(Keep.right)

函数 collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec) returns 一个 Future[T] 是 reactivemongo 驱动程序

第一个片段

此处每个传入的批量将被转换为一个 Future,并且在您提供的执行上下文中所说的 Future 将是 运行。只有在这一点上,下一个批量将通过生成另一个 Future 来处理,依此类推。

基本上期货是运行的顺序。这在行为上类似于

Flow[RandomCdr]
      .grouped(bulkSize)
      .mapAsync(parallelism = 1){ (bulk : Seq[RandomCdr]) =>
        collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)
      }
      .toMat(Sink.ignore)(Keep.right)

第二个片段

此处每个传入的批量都将转换为 Future,这将在您提供的执行上下文中 运行。 Future 将立即传递给 Sink.ignore 并且其引用将被丢弃。

使用这种方法无法控制同时 Future 的数量 运行。因此,不推荐使用这种方法。

如果您正在寻找改进的并行度,请考虑使用如上所示的 mapAsync,并调整并行度参数。