Play Framework Scala:如何流式传输请求正文
Play Framework Scala: How to Stream Request Body
我正在使用 Play Framework 2 构建微服务。3.x 使用 Scala(我在这两个方面都是初学者)但我想不出一种方法来流式传输我的请求正文。
问题是:
我需要一个端点 /transform
,我可以在其中接收一个巨大的 TSV 文件,我将以另一种格式解析和呈现该文件:简单的 t运行sformation。问题是我的控制器中的每个命令都是 运行 "too late"。它在开始代码之前等待接收完整文件。
示例:
def transform = Action.async {
Future {
Logger.info("Too late")
Ok("A response")
}
}
我希望能够在上传过程中逐行读取请求正文并处理请求,而不必等待文件被完全接收。
欢迎任何提示。
此答案适用于 Play 2。5.x 及更高版本,因为它使用 Akka 流 API 取代了该版本中基于 Iteratee 的流。
基本上,您可以创建一个 returns 可以传递给 Ok.chunked(...)
的 Source[T]
正文解析器。一种方法是在正文解析器中使用 Accumulator.source[T]
。例如,刚刚逐字返回发送给它的数据的操作可能如下所示:
def verbatimBodyParser: BodyParser[Source[ByteString, _]] = BodyParser { _ =>
// Return the source directly. We need to return
// an Accumulator[Either[Result, T]], so if we were
// handling any errors we could map to something like
// a Left(BadRequest("error")). Since we're not
// we just wrap the source in a Right(...)
Accumulator.source[ByteString]
.map(Right.apply)
}
def stream = Action(verbatimBodyParser) { implicit request =>
Ok.chunked(request.body)
}
如果你想做一些类似转换 TSV 文件的事情,你可以使用 Flow
来转换源代码,例如:
val tsvToCsv: BodyParser[Source[ByteString, _]] = BodyParser { req =>
val transformFlow: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString]
// Chunk incoming bytes by newlines, truncating them if the lines
// are longer than 1000 bytes...
.via(Framing.delimiter(ByteString("\n"), 1000, allowTruncation = true))
// Replace tabs by commas. This is just a silly example and
// you could obviously do something more clever here...
.map(s => ByteString(s.utf8String.split('\t').mkString(",") + "\n"))
Accumulator.source[ByteString]
.map(_.via(transformFlow))
.map(Right.apply)
}
def convert = Action(tsvToCsv) { implicit request =>
Ok.chunked(request.body).as("text/csv")
}
Play 文档的 Directing the Body Elsewhere 部分可能会有更多灵感。
我正在使用 Play Framework 2 构建微服务。3.x 使用 Scala(我在这两个方面都是初学者)但我想不出一种方法来流式传输我的请求正文。
问题是:
我需要一个端点 /transform
,我可以在其中接收一个巨大的 TSV 文件,我将以另一种格式解析和呈现该文件:简单的 t运行sformation。问题是我的控制器中的每个命令都是 运行 "too late"。它在开始代码之前等待接收完整文件。
示例:
def transform = Action.async {
Future {
Logger.info("Too late")
Ok("A response")
}
}
我希望能够在上传过程中逐行读取请求正文并处理请求,而不必等待文件被完全接收。
欢迎任何提示。
此答案适用于 Play 2。5.x 及更高版本,因为它使用 Akka 流 API 取代了该版本中基于 Iteratee 的流。
基本上,您可以创建一个 returns 可以传递给 Ok.chunked(...)
的 Source[T]
正文解析器。一种方法是在正文解析器中使用 Accumulator.source[T]
。例如,刚刚逐字返回发送给它的数据的操作可能如下所示:
def verbatimBodyParser: BodyParser[Source[ByteString, _]] = BodyParser { _ =>
// Return the source directly. We need to return
// an Accumulator[Either[Result, T]], so if we were
// handling any errors we could map to something like
// a Left(BadRequest("error")). Since we're not
// we just wrap the source in a Right(...)
Accumulator.source[ByteString]
.map(Right.apply)
}
def stream = Action(verbatimBodyParser) { implicit request =>
Ok.chunked(request.body)
}
如果你想做一些类似转换 TSV 文件的事情,你可以使用 Flow
来转换源代码,例如:
val tsvToCsv: BodyParser[Source[ByteString, _]] = BodyParser { req =>
val transformFlow: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString]
// Chunk incoming bytes by newlines, truncating them if the lines
// are longer than 1000 bytes...
.via(Framing.delimiter(ByteString("\n"), 1000, allowTruncation = true))
// Replace tabs by commas. This is just a silly example and
// you could obviously do something more clever here...
.map(s => ByteString(s.utf8String.split('\t').mkString(",") + "\n"))
Accumulator.source[ByteString]
.map(_.via(transformFlow))
.map(Right.apply)
}
def convert = Action(tsvToCsv) { implicit request =>
Ok.chunked(request.body).as("text/csv")
}
Play 文档的 Directing the Body Elsewhere 部分可能会有更多灵感。