Akka Streams - Source.unfoldAsync 的背压

Akka Streams - Backpressure for Source.unfoldAsync

我目前正在尝试读取分页的 HTTP 资源。每个页面都是一个多部分文档,如果有包含更多内容的页面,则该页面的响应会在 headers 中包含一个 next link。然后,自动解析器可以从最旧的页面开始,然后使用 headers 逐页读取以构造对下一页的请求。

我正在使用 Akka Streams 和 Akka Http 来实现,因为我的目标是创建流式处理解决方案。我想出了这个(我将在这里只包含代码的相关部分,请随意查看整个代码 this gist):

def read(request: HttpRequest): Source[HttpResponse, _] =
  Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)

val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
  .flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
  .flatMapConcat(_.parts)

....

def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
  case Some(req) =>
    Http().singleRequest(req).map { response =>
      if (response.status.isFailure()) Some((None, response))
      else nextRequest(response, HttpMethods.GET)
    }
  case None => Future.successful(None)
}

所以一般的想法是使用 Source.unfoldAsync 来爬取页面并进行 HTTP 请求(这个想法和实现与 中描述的非常接近)。这将创建一个Source[HttpResponse, _] 然后可以使用(Unmarshal 到 Multipart,分成单独的部分,...)。

我现在的问题是 HttpResponses 的消耗可能需要一段时间(如果页面很大,解组需要一些时间,最后可能会有一些数据库请求来持久化一些数据, ...)。所以如果下游速度较慢,我希望 Source.unfoldAsync 背压。默认情况下,下一个 HTTP 请求将在上一个请求完成后立即开始。

所以我的问题是:有什么方法可以对缓慢的下游产生 Source.unfoldAsync 背压吗?如果没有,是否有替代方案可以实现背压?

我可以想象一个利用 akka-http 提供的 Host-Level Client-Side API 的解决方案,如 here 所述以及循环图其中第一个请求的响应将用作生成第二个请求的输入,但我还没有尝试过,我不确定这是否可行。


编辑: 在玩了几天并阅读了文档和一些博客之后,我不确定我是否在正确的轨道上假设背压行为Source.unfoldAsync 是根本原因。要添加更多观察结果:

根据Source.unfoldAsyncimplementation,传入的函数仅在拉取源时调用:

def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)

因此,如果下游没有拉动(背压),则不会调用传入源的函数。

在您的要点中,您使用 runForeach(与 runWith(Sink.foreach) 相同)在 println 完成后立即拉动上游。所以这里很难注意到背压。

尝试将您的示例更改为 runWith(Sink.queue),这将为您提供 SinkQueueWithCancel 作为物化值。然后,除非您在队列上调用 pull,否则流将被背压并且不会发出请求。

请注意,在背压传播到所有流之前,可能会有一个或多个初始请求。

我想我明白了。正如我在问题编辑中已经提到的,我发现 this comment 是 Akka HTTP 中的一个问题,作者说:

...it is simply not best practice to mix Akka http into a larger processing stream. Instead, you need a boundary around the Akka http parts of the stream that ensures they always consume their response before allowing the outer processing stream to proceed.

所以我继续尝试:我没有在流的不同阶段进行 HTTP 请求和解组,而是通过 flatMapFuture[HttpResponse] 直接解组为 Future[Multipart.General]。这确保 HttpResponse 被直接消耗并避免 Response entity was not subscribed after 1 second 错误。 crawl 函数现在看起来略有不同,因为它必须 return 未编组的 Multipart.General object (用于进一步处理)以及原始 HttpResponse (以能够从 headers):

构造下一个请求
def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = {
  reqOption match {
    case Some(request) =>
      Http().singleRequest(request)
        .flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart)))
        .map {
          case tuple@(response, multipart) =>
            if (response.status.isFailure()) Some((None, tuple))
            else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) }
        }
    case None => Future.successful(None)
  }
}

其余代码必须因此更改。我创建了 another gist,其中包含与原始问题的要点相同的等效代码。

我期待这两个 Akka 项目能更好地集成(文档目前没有提到这个限制,相反 HTTP API 似乎鼓励用户一起使用 Akka HTTP 和 Akka Streams ),所以这感觉有点像解决方法,但它暂时解决了我的问题。在将这部分集成到我的更大的用例中时,我仍然需要弄清楚我遇到的一些其他问题,但这不是这里问题的一部分。