使用 AKKA-HTTP 流式传输 CSV 源
Streaming CSV Source with AKKA-HTTP
我正在尝试使用 reactivemongo-akkastream 0.12.1 和 return 将数据从 Mongodb 流式传输到其中一条路线中的 CSV 流中(使用 Akka-http ).
我确实按照这里的例子实现了它:
看起来工作正常。
我现在面临的唯一问题是如何将 headers 添加到输出 CSV 文件中。有什么想法吗?
谢谢
除了该示例并不是真正可靠的 CSV 生成方法(不提供适当的转义)之外,您还需要对其进行一些修改以添加 headers。这是我会做的:
- 创建
Flow
将 Source[Tweet]
转换为 CSV 行的来源,例如一个 Source[List[String]]
- 将其连接到一个包含您的 headers 的来源作为单个
List[String]
- 调整编组器以呈现行源而不是推文
下面是一些示例代码:
case class Tweet(uid: String, txt: String)
def getTweets: Source[Tweet, NotUsed] = ???
val tweetToRow: Flow[Tweet, List[String], NotUsed] =
Flow[Tweet].map { t =>
List(
t.uid,
t.txt.replaceAll(",", "."))
}
// provide a marshaller from a row (List[String]) to a ByteString
implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () =>
ByteString(row.mkString(","))
)
}
// enable csv streaming
implicit val csvStreaming = EntityStreamingSupport.csv()
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow)
complete(headers.concat(tweets))
}
更新:如果您的 getTweets
方法 returns a Future
您可以只映射其源值并以这种方式添加 headers,例如:
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val rows: Future[Source[List[String], NotUsed]] = getTweets
.map(tweets => headers.concat(tweets.via(tweetToRow)))
complete(rows)
}
我正在尝试使用 reactivemongo-akkastream 0.12.1 和 return 将数据从 Mongodb 流式传输到其中一条路线中的 CSV 流中(使用 Akka-http ). 我确实按照这里的例子实现了它:
看起来工作正常。
我现在面临的唯一问题是如何将 headers 添加到输出 CSV 文件中。有什么想法吗?
谢谢
除了该示例并不是真正可靠的 CSV 生成方法(不提供适当的转义)之外,您还需要对其进行一些修改以添加 headers。这是我会做的:
- 创建
Flow
将Source[Tweet]
转换为 CSV 行的来源,例如一个Source[List[String]]
- 将其连接到一个包含您的 headers 的来源作为单个
List[String]
- 调整编组器以呈现行源而不是推文
下面是一些示例代码:
case class Tweet(uid: String, txt: String)
def getTweets: Source[Tweet, NotUsed] = ???
val tweetToRow: Flow[Tweet, List[String], NotUsed] =
Flow[Tweet].map { t =>
List(
t.uid,
t.txt.replaceAll(",", "."))
}
// provide a marshaller from a row (List[String]) to a ByteString
implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () =>
ByteString(row.mkString(","))
)
}
// enable csv streaming
implicit val csvStreaming = EntityStreamingSupport.csv()
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow)
complete(headers.concat(tweets))
}
更新:如果您的 getTweets
方法 returns a Future
您可以只映射其源值并以这种方式添加 headers,例如:
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val rows: Future[Source[List[String], NotUsed]] = getTweets
.map(tweets => headers.concat(tweets.via(tweetToRow)))
complete(rows)
}