使用迭代器完成 akka-http 响应
Complete akka-http response with an iterator
我有一个 mongodb 查询结果的迭代器,我想将这些结果流式传输到 http 响应而不将整个结果集加载到内存中。
是否可以使用迭代器而不是集合或 future 来完成 akka http 响应?
看看Alpakka MongoDB connector。它允许从 Mongo 集合创建一个源,例如:
val source: Source[Document, NotUsed] = MongoSource(numbersColl.find())
val rows: Future[Seq[Document]] = source.runWith(Sink.seq)
或者您可能想要自己的源代码实现,例如 GraphStage。
给定 Iterator
数据:
type Data = ???
val dataIterator : () => Iterator[Data] = ???
你首先需要一个函数将Data
转换为ByteString
表示,以及表示的ContentType
(e.g. json, binary, csv, xml, ...):
import akka.util.ByteString
import akka.http.scaladsl.model.ContentType
val dataToByteStr : Data => ByteString = ???
//see akka.http.scaladsl.model.ContentTypes for possible values
val contentType : ContentType = ???
迭代器和转换器函数现在可用于创建一个 HttpResponse,它将结果流式传输回 http 客户端,而无需将整个数据集保存在内存中:
import akka.http.scaladsl.model.HttpEntity.{Chunked, ChunkStreamPart}
import akka.http.scaladsl.model.ResponseEntity
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpResponse
val chunks : Source[ChunkStreamPart,_] =
Source.fromIterator(dataIterator)
.map(dataToByteStr)
.map(ChunkStreamPart.apply)
val entity : ResponseEntity = Chunked.fromData(contentType, chunks)
val httpResponse : HttpResponse = HttpResponse(entity=entity)
注意:由于每次从 dataIterator
生成一个新的迭代器,您不必为每个传入请求创建一个新的 httpResponse
;相同的响应可用于所有请求。
我有一个 mongodb 查询结果的迭代器,我想将这些结果流式传输到 http 响应而不将整个结果集加载到内存中。
是否可以使用迭代器而不是集合或 future 来完成 akka http 响应?
看看Alpakka MongoDB connector。它允许从 Mongo 集合创建一个源,例如:
val source: Source[Document, NotUsed] = MongoSource(numbersColl.find())
val rows: Future[Seq[Document]] = source.runWith(Sink.seq)
或者您可能想要自己的源代码实现,例如 GraphStage。
给定 Iterator
数据:
type Data = ???
val dataIterator : () => Iterator[Data] = ???
你首先需要一个函数将Data
转换为ByteString
表示,以及表示的ContentType
(e.g. json, binary, csv, xml, ...):
import akka.util.ByteString
import akka.http.scaladsl.model.ContentType
val dataToByteStr : Data => ByteString = ???
//see akka.http.scaladsl.model.ContentTypes for possible values
val contentType : ContentType = ???
迭代器和转换器函数现在可用于创建一个 HttpResponse,它将结果流式传输回 http 客户端,而无需将整个数据集保存在内存中:
import akka.http.scaladsl.model.HttpEntity.{Chunked, ChunkStreamPart}
import akka.http.scaladsl.model.ResponseEntity
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpResponse
val chunks : Source[ChunkStreamPart,_] =
Source.fromIterator(dataIterator)
.map(dataToByteStr)
.map(ChunkStreamPart.apply)
val entity : ResponseEntity = Chunked.fromData(contentType, chunks)
val httpResponse : HttpResponse = HttpResponse(entity=entity)
注意:由于每次从 dataIterator
生成一个新的迭代器,您不必为每个传入请求创建一个新的 httpResponse
;相同的响应可用于所有请求。