Akka HTTP Stream 侦听器在一段时间后停止处理数据字节
Akka HTTP Stream listener stops processing databytes after a while
我有一个有 3 个 HTTP 侦听器的应用程序:
val futureResponse1: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = someUrl))
这 3 个中的每一个都在收听一个不间断的流(每个都收听不同的流)。并用一个简单的流程处理它,从分组开始,然后是相对快速的处理(非阻塞):
futureResponse1.flatMap {response =>
response.status match {
case StatusCodes.OK =>
val source: Source[ByteString, Any] = response.entity.dataBytes
source.
grouped(100).
map(doSomethingFast).
runWith(Sink.ignore)
case notOK => system.log.info("failed opening, status: " + notOK.toString())
}
...
我没有收到任何异常或警告。但过了一会儿(可能是 15-25 分钟),听众突然停止了。一个接一个(不在一起)。
也许问题出在分组阶段?或者 connection/stream 就停止了?或者他们共享的调度员正在挨饿/有些东西没有被释放。
请问为什么会发生这种情况?
====更新====
@Ramon J Romero y Vigil
我更改了 运行 只有 1 个流而不是 3 个流,并且删除了分组阶段。几分钟后仍然发生。我怀疑流正在基于超时关闭。我所做的就是获取块并使用它们。
====更新====
找到原因,见下文
原因是:
EntityStreamSizeException: actual entity size (None) exceeded content length limit (8388608 bytes)! You can configure this by setting akka.http.[server|client].parsing.max-content-length or calling HttpEntity.withSizeLimit before materializing the dataBytes stream.
对于在连续响应流的情况下寻求解决方案的任何人,您可以使用 withoutSizeLimit 以这种方式获取源:
val source: Source[ByteString, Any] = response.entity.withoutSizeLimit().dataBytes
我有一个有 3 个 HTTP 侦听器的应用程序:
val futureResponse1: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = someUrl))
这 3 个中的每一个都在收听一个不间断的流(每个都收听不同的流)。并用一个简单的流程处理它,从分组开始,然后是相对快速的处理(非阻塞):
futureResponse1.flatMap {response =>
response.status match {
case StatusCodes.OK =>
val source: Source[ByteString, Any] = response.entity.dataBytes
source.
grouped(100).
map(doSomethingFast).
runWith(Sink.ignore)
case notOK => system.log.info("failed opening, status: " + notOK.toString())
}
...
我没有收到任何异常或警告。但过了一会儿(可能是 15-25 分钟),听众突然停止了。一个接一个(不在一起)。
也许问题出在分组阶段?或者 connection/stream 就停止了?或者他们共享的调度员正在挨饿/有些东西没有被释放。
请问为什么会发生这种情况?
====更新====
@Ramon J Romero y Vigil 我更改了 运行 只有 1 个流而不是 3 个流,并且删除了分组阶段。几分钟后仍然发生。我怀疑流正在基于超时关闭。我所做的就是获取块并使用它们。
====更新====
找到原因,见下文
原因是:
EntityStreamSizeException: actual entity size (None) exceeded content length limit (8388608 bytes)! You can configure this by setting akka.http.[server|client].parsing.max-content-length or calling HttpEntity.withSizeLimit before materializing the dataBytes stream.
对于在连续响应流的情况下寻求解决方案的任何人,您可以使用 withoutSizeLimit 以这种方式获取源:
val source: Source[ByteString, Any] = response.entity.withoutSizeLimit().dataBytes