自定义版本的 fileUpload 指令无法实现
Custom version of fileUpload directive fails to materialize
当用户将文件上传到我的网络服务时,我想从 POST 请求中收集非二进制字段。它们包含上传文件的元数据。所以我修改了akka-http的fileUpload
指令到这个
def fileUpload3(fieldName: String): Directive1[(Map[String, String], FileInfo, Source[ByteString, Any])] =
entity(as[Multipart.FormData]).flatMap { formData ⇒
extractRequestContext.flatMap { ctx ⇒
implicit val mat: Materializer = ctx.materializer
val fut =
formData.parts.fold((Map.empty[String, String], Option.empty[(FileInfo, Source[ByteString, Any])])) { case ((fields, pairOpt), part) ⇒
if (part.filename.nonEmpty && part.name == fieldName) {
fields → Some((FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
} else if (part.filename.isEmpty && part.entity.contentType.mediaType == MediaTypes.`text/plain` && part.entity.isInstanceOf[HttpEntity.Strict]) {
fields.updated(part.name, part.entity.asInstanceOf[HttpEntity.Strict].data.utf8String) → pairOpt
} else {
fields → pairOpt
}
}
.collect {
case (fields, Some((info, stream))) ⇒
(fields, info, stream)
}
.runWith(Sink.headOption[(Map[String, String], FileInfo, Source[ByteString, Any])])
onSuccess(fut)
}
}.flatMap {
case Some(tuple) ⇒ provide(tuple)
case None ⇒ reject(MissingFormFieldRejection(fieldName))
}
虽然我看不出与 original code 有多大区别,但当我使用它时它对我来说失败了,但有以下例外:
akka.stream.AbruptIOTerminationException: Stream terminated without completing IO operation.
Caused by: akka.stream.impl.SubscriptionTimeoutException: Substream Source has not been materialized in 5000 milliseconds
at akka.stream.impl.fusing.SubSource.timeout(StreamOfStreams.scala:746)
伙计们,我错过了什么?
一开始我没有意识到,但是因为我们从单个连续流中获取所有字段,所以即使 akka-streams 允许,我们也无法提取其中一个字段供以后通过 Source[T]
流式使用我们这样做。
因此,在处理下一个请求之前,必须耗尽多部分请求的每个部分。
另请注意,以下函数将仅收集二进制文件之前的文本字段。
def fileUploadWithFields(fieldName: String): Directive1[(Map[String, String], FileInfo, Source[ByteString, Any])] =
entity(as[Multipart.FormData]).flatMap { formData ⇒
extractRequestContext.flatMap { ctx ⇒
implicit val mat: Materializer = ctx.materializer
// Because it's continuous stream of fields we MUST consume each field before switching to next one. [
val fut = formData.parts
.takeWhile(part ⇒ !(part.filename.isDefined && part.name == fieldName), inclusive = true)
.fold((Map.empty[String, String], Option.empty[(FileInfo, Source[ByteString, Any])])) { case ((fields, pairOpt), part) ⇒
if (part.filename.nonEmpty && part.name == fieldName) {
//println(s"Got file field: $part")
fields → Some((FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
} else if (part.filename.isEmpty && part.entity.contentType.mediaType.isText && part.entity.isInstanceOf[HttpEntity.Strict]) {
//println(s"Got text field: $part")
val text = part.entity.asInstanceOf[HttpEntity.Strict].data.utf8String
fields.updated(part.name, text) → pairOpt
} else {
//println(s"IGNORING field: $part")
part.entity.discardBytes()
fields → pairOpt
}
}
.collect {
case (fields, Some((info, stream))) ⇒
//println(s"Completed scanning fields: ${(fields, info, stream)}")
(fields, info, stream)
}
.runWith(Sink.headOption[(Map[String, String], FileInfo, Source[ByteString, Any])])
onSuccess(fut)
}
}.flatMap {
case Some(tuple) ⇒ provide(tuple)
case None ⇒ reject(MissingFormFieldRejection(fieldName))
}
当用户将文件上传到我的网络服务时,我想从 POST 请求中收集非二进制字段。它们包含上传文件的元数据。所以我修改了akka-http的fileUpload
指令到这个
def fileUpload3(fieldName: String): Directive1[(Map[String, String], FileInfo, Source[ByteString, Any])] =
entity(as[Multipart.FormData]).flatMap { formData ⇒
extractRequestContext.flatMap { ctx ⇒
implicit val mat: Materializer = ctx.materializer
val fut =
formData.parts.fold((Map.empty[String, String], Option.empty[(FileInfo, Source[ByteString, Any])])) { case ((fields, pairOpt), part) ⇒
if (part.filename.nonEmpty && part.name == fieldName) {
fields → Some((FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
} else if (part.filename.isEmpty && part.entity.contentType.mediaType == MediaTypes.`text/plain` && part.entity.isInstanceOf[HttpEntity.Strict]) {
fields.updated(part.name, part.entity.asInstanceOf[HttpEntity.Strict].data.utf8String) → pairOpt
} else {
fields → pairOpt
}
}
.collect {
case (fields, Some((info, stream))) ⇒
(fields, info, stream)
}
.runWith(Sink.headOption[(Map[String, String], FileInfo, Source[ByteString, Any])])
onSuccess(fut)
}
}.flatMap {
case Some(tuple) ⇒ provide(tuple)
case None ⇒ reject(MissingFormFieldRejection(fieldName))
}
虽然我看不出与 original code 有多大区别,但当我使用它时它对我来说失败了,但有以下例外:
akka.stream.AbruptIOTerminationException: Stream terminated without completing IO operation.
Caused by: akka.stream.impl.SubscriptionTimeoutException: Substream Source has not been materialized in 5000 milliseconds
at akka.stream.impl.fusing.SubSource.timeout(StreamOfStreams.scala:746)
伙计们,我错过了什么?
一开始我没有意识到,但是因为我们从单个连续流中获取所有字段,所以即使 akka-streams 允许,我们也无法提取其中一个字段供以后通过 Source[T]
流式使用我们这样做。
因此,在处理下一个请求之前,必须耗尽多部分请求的每个部分。
另请注意,以下函数将仅收集二进制文件之前的文本字段。
def fileUploadWithFields(fieldName: String): Directive1[(Map[String, String], FileInfo, Source[ByteString, Any])] =
entity(as[Multipart.FormData]).flatMap { formData ⇒
extractRequestContext.flatMap { ctx ⇒
implicit val mat: Materializer = ctx.materializer
// Because it's continuous stream of fields we MUST consume each field before switching to next one. [
val fut = formData.parts
.takeWhile(part ⇒ !(part.filename.isDefined && part.name == fieldName), inclusive = true)
.fold((Map.empty[String, String], Option.empty[(FileInfo, Source[ByteString, Any])])) { case ((fields, pairOpt), part) ⇒
if (part.filename.nonEmpty && part.name == fieldName) {
//println(s"Got file field: $part")
fields → Some((FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
} else if (part.filename.isEmpty && part.entity.contentType.mediaType.isText && part.entity.isInstanceOf[HttpEntity.Strict]) {
//println(s"Got text field: $part")
val text = part.entity.asInstanceOf[HttpEntity.Strict].data.utf8String
fields.updated(part.name, text) → pairOpt
} else {
//println(s"IGNORING field: $part")
part.entity.discardBytes()
fields → pairOpt
}
}
.collect {
case (fields, Some((info, stream))) ⇒
//println(s"Completed scanning fields: ${(fields, info, stream)}")
(fields, info, stream)
}
.runWith(Sink.headOption[(Map[String, String], FileInfo, Source[ByteString, Any])])
onSuccess(fut)
}
}.flatMap {
case Some(tuple) ⇒ provide(tuple)
case None ⇒ reject(MissingFormFieldRejection(fieldName))
}