如何使用 flatMapConcat?
How to use flatMapConcat?
我正在尝试按以下方式使用 flatMapConcat
:
Source.empty
.flatMapConcat {
Source.fromFuture(Future("hello"))
}
.runWith(Sink.foreach(println))
.onComplete {
case Success(_) =>
println()
case Failure(e) =>
println(s"Thrown ${e.getMessage}")
}
编译器抱怨:
Error:(31, 26) type mismatch;
found : akka.stream.scaladsl.Source[String,akka.NotUsed]
required: ? => akka.stream.Graph[akka.stream.SourceShape[?],?]
Source.fromFuture(Future("hello"))
我做错了什么?
方法 flatMapConcat 具有以下签名:
def flatMapConcat[T, M](f: (Out) => Graph[SourceShape[T], M]): Repr[T]
在处理 String
中的 Source
的情况下,需要这样的函数:
f: String => Source(Iterable[String])
您的示例代码的另一个问题是 Source.empty[T]
没有要处理的元素,因此后续的 flatMapConcat
将永远不会执行。
下面是一个使用 flatMapConcat
从 Source
个名称中转换每个元素的示例:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
Source(List("alice", "bob", "jenn")).
flatMapConcat{ name => Source(List(s"Hi $name", s"Bye $name")) }.
runWith(Sink.foreach(println))
// Hi alice
// Bye alice
// Hi bob
// Bye bob
// Hi jenn
// Bye jenn
作为旁注,可以将上面示例中的 flatMapConcat
替换为 mapConcat,这需要更简单的函数签名:
Source(List("alice", "bob", "jenn")).
mapConcat{ name => List(s"Hi $name", s"Bye $name") }.
runWith(Sink.foreach(println))
我正在尝试按以下方式使用 flatMapConcat
:
Source.empty
.flatMapConcat {
Source.fromFuture(Future("hello"))
}
.runWith(Sink.foreach(println))
.onComplete {
case Success(_) =>
println()
case Failure(e) =>
println(s"Thrown ${e.getMessage}")
}
编译器抱怨:
Error:(31, 26) type mismatch;
found : akka.stream.scaladsl.Source[String,akka.NotUsed]
required: ? => akka.stream.Graph[akka.stream.SourceShape[?],?]
Source.fromFuture(Future("hello"))
我做错了什么?
方法 flatMapConcat 具有以下签名:
def flatMapConcat[T, M](f: (Out) => Graph[SourceShape[T], M]): Repr[T]
在处理 String
中的 Source
的情况下,需要这样的函数:
f: String => Source(Iterable[String])
您的示例代码的另一个问题是 Source.empty[T]
没有要处理的元素,因此后续的 flatMapConcat
将永远不会执行。
下面是一个使用 flatMapConcat
从 Source
个名称中转换每个元素的示例:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
Source(List("alice", "bob", "jenn")).
flatMapConcat{ name => Source(List(s"Hi $name", s"Bye $name")) }.
runWith(Sink.foreach(println))
// Hi alice
// Bye alice
// Hi bob
// Bye bob
// Hi jenn
// Bye jenn
作为旁注,可以将上面示例中的 flatMapConcat
替换为 mapConcat,这需要更简单的函数签名:
Source(List("alice", "bob", "jenn")).
mapConcat{ name => List(s"Hi $name", s"Bye $name") }.
runWith(Sink.foreach(println))