在 Stream 中链接 Akka-http-client 请求
Chain Akka-http-client requests in a Stream
我想使用 akka-http-client 作为 Stream 链接 http 请求。链中的每个 http 请求都依赖于先前请求的 success/response,并使用它来构造新请求。如果请求不成功,Stream 应该 return 不成功请求的响应。
如何在 akka-http 中构建这样的流?
我应该使用哪个 akka-http 客户端级别 API?
如果您正在制作网络爬虫,请查看 this post。这个答案解决了一个更简单的情况,例如下载分页资源,其中下一页的 link 在当前页面响应的 header 中。
您可以使用 Source.unfoldAsync
方法创建链式源 - 一个项目指向下一个项目。这需要一个函数,该函数需要一个元素 S
和 returns Future[Option[(S, E)]]
来确定流是否应该继续发出类型 E
的元素,将状态传递给下一个调用。
在你的情况下,这有点像:
- 接受初始
HttpRequest
- 产生一个
Future[HttpResponse]
- 如果响应指向另一个 URL,returning
Some(request -> response)
,否则 None
但是,有一个问题,那就是如果它不包含指向下一个请求的指针,它将不会从流发出响应。
要解决这个问题,您可以将函数传递给 unfoldAsync
return Future[Option[(Option[HttpRequest], HttpResponse)]]
。这使您可以处理以下情况:
- 当前响应错误
- 当前响应指向另一个请求
- 当前响应没有指向另一个请求
下面是一些概述此方法的注释代码,但首先是初步的:
当流式传输 HTTP 请求到 Akka 流中的响应时,您需要确保响应 body 被消耗,否则会发生不好的事情(死锁等)。如果你不这样做不需要 body 你可以忽略它,但这里我们使用一个函数将 HttpEntity
从(潜在的)流转换为严格的实体:
import scala.concurrent.duration._
def convertToStrict(r: HttpResponse): Future[HttpResponse] =
r.entity.toStrict(10.minutes).map(e => r.withEntity(e))
接下来,几个函数从 HttpResponse
创建 Option[HttpRequest]
。此示例使用类似 Github 的分页 links 的方案,其中 Links
header 包含,例如:<https://api.github.com/...> rel="next"
:
def nextUri(r: HttpResponse): Seq[Uri] = for {
linkHeader <- r.header[Link].toSeq
value <- linkHeader.values
params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri
def getNextRequest(r: HttpResponse): Option[HttpRequest] =
nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))
接下来,我们将传递给 unfoldAsync
的真正函数。它使用 Akka HTTP Http().singleRequest()
API 获取 HttpRequest
并生成 Future[HttpResponse]
:
def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
reqOption match {
case Some(req) => Http().singleRequest(req).flatMap { response =>
// handle the error case. Here we just return the errored response
// with no next item.
if (response.status.isFailure()) Future.successful(Some(None -> response))
// Otherwise, convert the response to a strict response by
// taking up the body and looking for a next request.
else convertToStrict(response).map { strictResponse =>
getNextRequest(strictResponse) match {
// If we have no next request, return Some containing an
// empty state, but the current value
case None => Some(None -> strictResponse)
// Otherwise, pass on the request...
case next => Some(next -> strictResponse)
}
}
}
// Finally, there's no next request, end the stream by
// returning none as the state.
case None => Future.successful(None)
}
请注意,如果我们收到错误响应,流将不会继续,因为我们 return None
处于下一个状态。
您可以调用它来获取 HttpResponse
objects 的流,如下所示:
val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest)(chainRequests)
至于 return 最后一个(或错误的)响应的值,您只需使用 Sink.last
,因为流将在成功完成或第一个错误时结束回复。例如:
def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest))(chainRequests)
.map(_.status)
.runWith(Sink.last)
我想使用 akka-http-client 作为 Stream 链接 http 请求。链中的每个 http 请求都依赖于先前请求的 success/response,并使用它来构造新请求。如果请求不成功,Stream 应该 return 不成功请求的响应。
如何在 akka-http 中构建这样的流? 我应该使用哪个 akka-http 客户端级别 API?
如果您正在制作网络爬虫,请查看 this post。这个答案解决了一个更简单的情况,例如下载分页资源,其中下一页的 link 在当前页面响应的 header 中。
您可以使用 Source.unfoldAsync
方法创建链式源 - 一个项目指向下一个项目。这需要一个函数,该函数需要一个元素 S
和 returns Future[Option[(S, E)]]
来确定流是否应该继续发出类型 E
的元素,将状态传递给下一个调用。
在你的情况下,这有点像:
- 接受初始
HttpRequest
- 产生一个
Future[HttpResponse]
- 如果响应指向另一个 URL,returning
Some(request -> response)
,否则None
但是,有一个问题,那就是如果它不包含指向下一个请求的指针,它将不会从流发出响应。
要解决这个问题,您可以将函数传递给 unfoldAsync
return Future[Option[(Option[HttpRequest], HttpResponse)]]
。这使您可以处理以下情况:
- 当前响应错误
- 当前响应指向另一个请求
- 当前响应没有指向另一个请求
下面是一些概述此方法的注释代码,但首先是初步的:
当流式传输 HTTP 请求到 Akka 流中的响应时,您需要确保响应 body 被消耗,否则会发生不好的事情(死锁等)。如果你不这样做不需要 body 你可以忽略它,但这里我们使用一个函数将 HttpEntity
从(潜在的)流转换为严格的实体:
import scala.concurrent.duration._
def convertToStrict(r: HttpResponse): Future[HttpResponse] =
r.entity.toStrict(10.minutes).map(e => r.withEntity(e))
接下来,几个函数从 HttpResponse
创建 Option[HttpRequest]
。此示例使用类似 Github 的分页 links 的方案,其中 Links
header 包含,例如:<https://api.github.com/...> rel="next"
:
def nextUri(r: HttpResponse): Seq[Uri] = for {
linkHeader <- r.header[Link].toSeq
value <- linkHeader.values
params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri
def getNextRequest(r: HttpResponse): Option[HttpRequest] =
nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))
接下来,我们将传递给 unfoldAsync
的真正函数。它使用 Akka HTTP Http().singleRequest()
API 获取 HttpRequest
并生成 Future[HttpResponse]
:
def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
reqOption match {
case Some(req) => Http().singleRequest(req).flatMap { response =>
// handle the error case. Here we just return the errored response
// with no next item.
if (response.status.isFailure()) Future.successful(Some(None -> response))
// Otherwise, convert the response to a strict response by
// taking up the body and looking for a next request.
else convertToStrict(response).map { strictResponse =>
getNextRequest(strictResponse) match {
// If we have no next request, return Some containing an
// empty state, but the current value
case None => Some(None -> strictResponse)
// Otherwise, pass on the request...
case next => Some(next -> strictResponse)
}
}
}
// Finally, there's no next request, end the stream by
// returning none as the state.
case None => Future.successful(None)
}
请注意,如果我们收到错误响应,流将不会继续,因为我们 return None
处于下一个状态。
您可以调用它来获取 HttpResponse
objects 的流,如下所示:
val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest)(chainRequests)
至于 return 最后一个(或错误的)响应的值,您只需使用 Sink.last
,因为流将在成功完成或第一个错误时结束回复。例如:
def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest))(chainRequests)
.map(_.status)
.runWith(Sink.last)