Akka Streams - Source.unfoldAsync 的背压
Akka Streams - Backpressure for Source.unfoldAsync
我目前正在尝试读取分页的 HTTP 资源。每个页面都是一个多部分文档,如果有包含更多内容的页面,则该页面的响应会在 headers 中包含一个 next
link。然后,自动解析器可以从最旧的页面开始,然后使用 headers 逐页读取以构造对下一页的请求。
我正在使用 Akka Streams 和 Akka Http 来实现,因为我的目标是创建流式处理解决方案。我想出了这个(我将在这里只包含代码的相关部分,请随意查看整个代码 this gist):
def read(request: HttpRequest): Source[HttpResponse, _] =
Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)
val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
.flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
.flatMapConcat(_.parts)
....
def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
case Some(req) =>
Http().singleRequest(req).map { response =>
if (response.status.isFailure()) Some((None, response))
else nextRequest(response, HttpMethods.GET)
}
case None => Future.successful(None)
}
所以一般的想法是使用 Source.unfoldAsync
来爬取页面并进行 HTTP 请求(这个想法和实现与 中描述的非常接近)。这将创建一个Source[HttpResponse, _]
然后可以使用(Unmarshal 到 Multipart,分成单独的部分,...)。
我现在的问题是 HttpResponse
s 的消耗可能需要一段时间(如果页面很大,解组需要一些时间,最后可能会有一些数据库请求来持久化一些数据, ...)。所以如果下游速度较慢,我希望 Source.unfoldAsync
背压。默认情况下,下一个 HTTP 请求将在上一个请求完成后立即开始。
所以我的问题是:有什么方法可以对缓慢的下游产生 Source.unfoldAsync
背压吗?如果没有,是否有替代方案可以实现背压?
我可以想象一个利用 akka-http 提供的 Host-Level Client-Side API 的解决方案,如 here 所述以及循环图其中第一个请求的响应将用作生成第二个请求的输入,但我还没有尝试过,我不确定这是否可行。
编辑: 在玩了几天并阅读了文档和一些博客之后,我不确定我是否在正确的轨道上假设背压行为Source.unfoldAsync
是根本原因。要添加更多观察结果:
- 当流开始时,我看到发出了几个请求。这首先没有问题,只要及时消耗生成的
HttpResponse
(有关说明,请参阅 here)
- 如果我不更改默认值
response-entity-subscription-timeout
,我将 运行 出现以下错误(我删除了 URL):
[WARN] [03/30/2019 13:44:58.984] [default-akka.actor.default-dispatcher-16] [default/Pool(shared->http://....)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 seconds. Make sure to read the response entity body or call discardBytes() on it. GET ... Empty -> 200 OK Chunked
这导致 IllegalStateException
终止流:java.lang.IllegalStateException: Substream Source cannot be materialized more than once
- 我观察到响应的解组是流中最慢的部分,这可能是有道理的,因为响应 body 是一个多部分文档,因此相对较大。但是,我希望流的这一部分向上游发出更少的需求信号(在我的例子中是
Source.unfoldAsync
部分)。这应该会导致请求减少。
- 一些谷歌搜索将我带到 a discussion about an issue that seems to describe a similar problem. They also discuss the problems that occur when a response is not processed fast enough. The associated merge request will bring documentation changes that propose to completely consume the
HttpResponse
before continuing with the stream. In the discussion to the issue there are also doubts about whether or not it's a good idea to combine Akka Http with Akka Streams。所以也许我必须更改实现以直接在 unfoldAsync
. 调用的函数内部进行解组
根据Source.unfoldAsync
的implementation,传入的函数仅在拉取源时调用:
def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
因此,如果下游没有拉动(背压),则不会调用传入源的函数。
在您的要点中,您使用 runForeach
(与 runWith(Sink.foreach)
相同)在 println
完成后立即拉动上游。所以这里很难注意到背压。
尝试将您的示例更改为 runWith(Sink.queue)
,这将为您提供 SinkQueueWithCancel
作为物化值。然后,除非您在队列上调用 pull
,否则流将被背压并且不会发出请求。
请注意,在背压传播到所有流之前,可能会有一个或多个初始请求。
我想我明白了。正如我在问题编辑中已经提到的,我发现 this comment 是 Akka HTTP 中的一个问题,作者说:
...it is simply not best practice to mix Akka http into a larger processing stream. Instead, you need a boundary around the Akka http parts of the stream that ensures they always consume their response before allowing the outer processing stream to proceed.
所以我继续尝试:我没有在流的不同阶段进行 HTTP 请求和解组,而是通过 flatMap
将 Future[HttpResponse]
直接解组为 Future[Multipart.General]
。这确保 HttpResponse
被直接消耗并避免 Response entity was not subscribed after 1 second
错误。 crawl
函数现在看起来略有不同,因为它必须 return 未编组的 Multipart.General
object (用于进一步处理)以及原始 HttpResponse
(以能够从 headers):
构造下一个请求
def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = {
reqOption match {
case Some(request) =>
Http().singleRequest(request)
.flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart)))
.map {
case tuple@(response, multipart) =>
if (response.status.isFailure()) Some((None, tuple))
else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) }
}
case None => Future.successful(None)
}
}
其余代码必须因此更改。我创建了 another gist,其中包含与原始问题的要点相同的等效代码。
我期待这两个 Akka 项目能更好地集成(文档目前没有提到这个限制,相反 HTTP API 似乎鼓励用户一起使用 Akka HTTP 和 Akka Streams ),所以这感觉有点像解决方法,但它暂时解决了我的问题。在将这部分集成到我的更大的用例中时,我仍然需要弄清楚我遇到的一些其他问题,但这不是这里问题的一部分。
我目前正在尝试读取分页的 HTTP 资源。每个页面都是一个多部分文档,如果有包含更多内容的页面,则该页面的响应会在 headers 中包含一个 next
link。然后,自动解析器可以从最旧的页面开始,然后使用 headers 逐页读取以构造对下一页的请求。
我正在使用 Akka Streams 和 Akka Http 来实现,因为我的目标是创建流式处理解决方案。我想出了这个(我将在这里只包含代码的相关部分,请随意查看整个代码 this gist):
def read(request: HttpRequest): Source[HttpResponse, _] =
Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)
val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
.flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
.flatMapConcat(_.parts)
....
def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
case Some(req) =>
Http().singleRequest(req).map { response =>
if (response.status.isFailure()) Some((None, response))
else nextRequest(response, HttpMethods.GET)
}
case None => Future.successful(None)
}
所以一般的想法是使用 Source.unfoldAsync
来爬取页面并进行 HTTP 请求(这个想法和实现与 Source[HttpResponse, _]
然后可以使用(Unmarshal 到 Multipart,分成单独的部分,...)。
我现在的问题是 HttpResponse
s 的消耗可能需要一段时间(如果页面很大,解组需要一些时间,最后可能会有一些数据库请求来持久化一些数据, ...)。所以如果下游速度较慢,我希望 Source.unfoldAsync
背压。默认情况下,下一个 HTTP 请求将在上一个请求完成后立即开始。
所以我的问题是:有什么方法可以对缓慢的下游产生 Source.unfoldAsync
背压吗?如果没有,是否有替代方案可以实现背压?
我可以想象一个利用 akka-http 提供的 Host-Level Client-Side API 的解决方案,如 here 所述以及循环图其中第一个请求的响应将用作生成第二个请求的输入,但我还没有尝试过,我不确定这是否可行。
编辑: 在玩了几天并阅读了文档和一些博客之后,我不确定我是否在正确的轨道上假设背压行为Source.unfoldAsync
是根本原因。要添加更多观察结果:
- 当流开始时,我看到发出了几个请求。这首先没有问题,只要及时消耗生成的
HttpResponse
(有关说明,请参阅 here) - 如果我不更改默认值
response-entity-subscription-timeout
,我将 运行 出现以下错误(我删除了 URL):
[WARN] [03/30/2019 13:44:58.984] [default-akka.actor.default-dispatcher-16] [default/Pool(shared->http://....)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 seconds. Make sure to read the response entity body or call discardBytes() on it. GET ... Empty -> 200 OK Chunked
这导致IllegalStateException
终止流:java.lang.IllegalStateException: Substream Source cannot be materialized more than once
- 我观察到响应的解组是流中最慢的部分,这可能是有道理的,因为响应 body 是一个多部分文档,因此相对较大。但是,我希望流的这一部分向上游发出更少的需求信号(在我的例子中是
Source.unfoldAsync
部分)。这应该会导致请求减少。 - 一些谷歌搜索将我带到 a discussion about an issue that seems to describe a similar problem. They also discuss the problems that occur when a response is not processed fast enough. The associated merge request will bring documentation changes that propose to completely consume the
HttpResponse
before continuing with the stream. In the discussion to the issue there are also doubts about whether or not it's a good idea to combine Akka Http with Akka Streams。所以也许我必须更改实现以直接在unfoldAsync
. 调用的函数内部进行解组
根据Source.unfoldAsync
的implementation,传入的函数仅在拉取源时调用:
def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
因此,如果下游没有拉动(背压),则不会调用传入源的函数。
在您的要点中,您使用 runForeach
(与 runWith(Sink.foreach)
相同)在 println
完成后立即拉动上游。所以这里很难注意到背压。
尝试将您的示例更改为 runWith(Sink.queue)
,这将为您提供 SinkQueueWithCancel
作为物化值。然后,除非您在队列上调用 pull
,否则流将被背压并且不会发出请求。
请注意,在背压传播到所有流之前,可能会有一个或多个初始请求。
我想我明白了。正如我在问题编辑中已经提到的,我发现 this comment 是 Akka HTTP 中的一个问题,作者说:
...it is simply not best practice to mix Akka http into a larger processing stream. Instead, you need a boundary around the Akka http parts of the stream that ensures they always consume their response before allowing the outer processing stream to proceed.
所以我继续尝试:我没有在流的不同阶段进行 HTTP 请求和解组,而是通过 flatMap
将 Future[HttpResponse]
直接解组为 Future[Multipart.General]
。这确保 HttpResponse
被直接消耗并避免 Response entity was not subscribed after 1 second
错误。 crawl
函数现在看起来略有不同,因为它必须 return 未编组的 Multipart.General
object (用于进一步处理)以及原始 HttpResponse
(以能够从 headers):
def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = {
reqOption match {
case Some(request) =>
Http().singleRequest(request)
.flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart)))
.map {
case tuple@(response, multipart) =>
if (response.status.isFailure()) Some((None, tuple))
else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) }
}
case None => Future.successful(None)
}
}
其余代码必须因此更改。我创建了 another gist,其中包含与原始问题的要点相同的等效代码。
我期待这两个 Akka 项目能更好地集成(文档目前没有提到这个限制,相反 HTTP API 似乎鼓励用户一起使用 Akka HTTP 和 Akka Streams ),所以这感觉有点像解决方法,但它暂时解决了我的问题。在将这部分集成到我的更大的用例中时,我仍然需要弄清楚我遇到的一些其他问题,但这不是这里问题的一部分。