源流上的 Akka 流 mapConcat 运算符

Akka Streams mapConcat Operator on Source Streams

我正在阅读 Akka 流的文档,我遇到了 mapConcat 运算符,它类似于 flatMap(至少在概念层面上)。

这是一个简单的例子:

scala> val src = Source.fromFuture(Future.successful(1 to 10))
src: akka.stream.scaladsl.Source[scala.collection.immutable.Range.Inclusive,akka.NotUsed] = Source(SourceShape(FutureSource.out(51943878)))

我原以为源代码的类型是:

akka.stream.scaladsl.Source[Future[scala.collection.immutable.Range.Inclusive],akka.NotUsed]

为什么不是这样?

我对每一行的类型的理解如下:

Source
  .fromFuture(Future.successful(1 to 10)) // Source[Future[Int]]
  .mapConcat(identity) // Source[Int]
  .runForeach(println)

但是上面例子中的Source类型并不是我想的那样!

Source.fromFuture的签名是:

def fromFuture[O](future: Future[O]): Source[O, NotUsed]

在您的示例中,Oscala.collection.immutable.Range.Inclusive 类型,因此 Source.fromFuture 的 return 类型是:

Source[scala.collection.immutable.Range.Inclusive, NotUsed]

Scala docs

这里有一个例子展示了 mapmapConcat 之间的区别:

def f: Future[List[Int]] = Future.successful((1 to 5).toList)

def g(l: List[Int]): List[String] = l.map(_.toString * 2)

Source
  .fromFuture(f)
  .mapConcat(g) // emits 5 elements of type Int
  .runForeach(println)

Source
  .fromFuture(f)
  .map(g) // emits one element of type List[Int]
  .runForeach(println)