使用迭代器完成 akka-http 响应

Complete akka-http response with an iterator

我有一个 mongodb 查询结果的迭代器,我想将这些结果流式传输到 http 响应而不将整个结果集加载到内存中。

是否可以使用迭代器而不是集合或 future 来完成 akka http 响应?

看看Alpakka MongoDB connector。它允许从 Mongo 集合创建一个源,例如:

val source: Source[Document, NotUsed] = MongoSource(numbersColl.find())

val rows: Future[Seq[Document]] = source.runWith(Sink.seq)

或者您可能想要自己的源代码实现,例如 GraphStage。

给定 Iterator 数据:

type Data = ???

val dataIterator : () => Iterator[Data] = ???

你首先需要一个函数将Data转换为ByteString表示,以及表示的ContentTypee.g. json, binary, csv, xml, ...):

import akka.util.ByteString
import akka.http.scaladsl.model.ContentType    

val dataToByteStr : Data => ByteString = ???

//see akka.http.scaladsl.model.ContentTypes for possible values
val contentType : ContentType = ???

迭代器和转换器函数现在可用于创建一个 HttpResponse,它将结果流式传输回 http 客户端,而无需将整个数据集保存在内存中:

import akka.http.scaladsl.model.HttpEntity.{Chunked, ChunkStreamPart}
import akka.http.scaladsl.model.ResponseEntity    
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpResponse

val chunks : Source[ChunkStreamPart,_] = 
  Source.fromIterator(dataIterator)
        .map(dataToByteStr)
        .map(ChunkStreamPart.apply)

val entity : ResponseEntity = Chunked.fromData(contentType, chunks)

val httpResponse : HttpResponse = HttpResponse(entity=entity)

注意:由于每次从 dataIterator 生成一个新的迭代器,您不必为每个传入请求创建一个新的 httpResponse;相同的响应可用于所有请求。