通过连接池发出 http 请求时 Akka Flow 挂起
Akka Flow hangs when making http requests via connection pool
我正在使用 Akka 2.4.4 并尝试从 Apache HttpAsyncClient 迁移(未成功)。
以下是我在项目中使用的代码的简化版本。
问题是,如果我向流发送超过 1-3 个请求,它就会挂起。到目前为止,经过 6 个小时的调试,我什至找不到问题所在。我在 Decider
中没有看到异常、错误日志和事件。没有:)
我尝试将 connection-timeout
设置减少到 1s,认为它可能正在等待服务器的响应,但没有帮助。
我做错了什么?
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory
import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try
object Main {
implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
val config = ConfigFactory.load()
private val baseDomain = "www.google.com"
private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))
private val decider: Decider = {
case ex =>
ex.printStackTrace()
Supervision.Stop
}
private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =
Source.fromIterator(() => items.toIterator)
.via(poolClientFlow)
.log("Logger")(log = myAdapter)
.recoverWith {
case ex =>
println(ex)
null
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
.map { v =>
println(s"Got ${v.length} responses in Flow")
v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
}
def main(args: Array[String]) {
val headers = imSeq(Referer("https://www.google.com/"))
val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
val requests = List.fill(10)(reqPair)
val qwe = sendMultipleRequests(requests).map { case responses =>
println(s"Got ${responses.length} responses")
system.terminate()
}
Await.ready(system.whenTerminated, Duration.Inf)
}
}
还有 proxy support 是怎么回事?似乎也不适合我。
您需要完全使用响应正文,以便连接可用于后续请求。如果你根本不关心响应实体,那么你可以将它排干到 Sink.ignore
,像这样:
resp.entity.dataBytes.runWith(Sink.ignore)
根据默认配置,当使用主机连接池时,最大连接数设置为 4。每个池都有自己的队列,请求在其中等待一个打开的连接可用。如果该队列超过 32(默认配置,可以更改,必须是 2 的幂),那么您将开始看到故障。在你的例子中,你只做了 10 个请求,所以你没有达到这个限制。但是,通过不使用响应实体,您不会释放连接,其他所有内容都在后面排队,等待连接释放。
我正在使用 Akka 2.4.4 并尝试从 Apache HttpAsyncClient 迁移(未成功)。
以下是我在项目中使用的代码的简化版本。
问题是,如果我向流发送超过 1-3 个请求,它就会挂起。到目前为止,经过 6 个小时的调试,我什至找不到问题所在。我在 Decider
中没有看到异常、错误日志和事件。没有:)
我尝试将 connection-timeout
设置减少到 1s,认为它可能正在等待服务器的响应,但没有帮助。
我做错了什么?
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory
import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try
object Main {
implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
val config = ConfigFactory.load()
private val baseDomain = "www.google.com"
private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))
private val decider: Decider = {
case ex =>
ex.printStackTrace()
Supervision.Stop
}
private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =
Source.fromIterator(() => items.toIterator)
.via(poolClientFlow)
.log("Logger")(log = myAdapter)
.recoverWith {
case ex =>
println(ex)
null
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
.map { v =>
println(s"Got ${v.length} responses in Flow")
v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
}
def main(args: Array[String]) {
val headers = imSeq(Referer("https://www.google.com/"))
val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
val requests = List.fill(10)(reqPair)
val qwe = sendMultipleRequests(requests).map { case responses =>
println(s"Got ${responses.length} responses")
system.terminate()
}
Await.ready(system.whenTerminated, Duration.Inf)
}
}
还有 proxy support 是怎么回事?似乎也不适合我。
您需要完全使用响应正文,以便连接可用于后续请求。如果你根本不关心响应实体,那么你可以将它排干到 Sink.ignore
,像这样:
resp.entity.dataBytes.runWith(Sink.ignore)
根据默认配置,当使用主机连接池时,最大连接数设置为 4。每个池都有自己的队列,请求在其中等待一个打开的连接可用。如果该队列超过 32(默认配置,可以更改,必须是 2 的幂),那么您将开始看到故障。在你的例子中,你只做了 10 个请求,所以你没有达到这个限制。但是,通过不使用响应实体,您不会释放连接,其他所有内容都在后面排队,等待连接释放。