为什么 akka http 不为前 N 个请求发出响应?
Why akka http is not emiting responses for first N requests?
我正在尝试使用 akka-http 向单个主机发出 http 请求(例如 "akka.io")。问题是创建的流 (Http().cachedHostConnectionPool) 仅在发出 N 个 http 请求后才开始发出响应,其中 N 等于最大连接数。
import scala.util.Failure
import scala.util.Success
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.Uri.apply
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object ConnectionPoolExample extends App {
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val config = ConfigFactory.load()
val connectionPoolSettings = ConnectionPoolSettings(config).withMaxConnections(10)
lazy val poolClientFlow = Http().cachedHostConnectionPool[Unit]("akka.io", 80, connectionPoolSettings)
val fakeSource = Source.fromIterator[Unit] { () => Iterator.continually { Thread.sleep(1000); () } }
val requests = fakeSource.map { _ => println("Creating request"); HttpRequest(uri = "/") -> (()) }
val responses = requests.via(poolClientFlow)
responses.runForeach {
case (tryResponse, jsonData) =>
tryResponse match {
case Success(httpResponse) =>
httpResponse.entity.dataBytes.runWith(Sink.ignore)
println(s"status: ${httpResponse.status}")
case Failure(e) => {
println(e)
}
}
}
}
输出如下所示:
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
status: 200 OK
Creating request
status: 200 OK
Creating request
status: 200 OK
...
我找不到任何允许在响应准备就绪时发出响应的配置参数,而不是在池没有可用连接时发出响应。
谢谢!
原因是您通过调用 Thread.sleep
来阻止客户端执行其他工作——该方法在反应式程序中是被禁止的。正确且更简单的方法是使用 Source.tick
.
我正在尝试使用 akka-http 向单个主机发出 http 请求(例如 "akka.io")。问题是创建的流 (Http().cachedHostConnectionPool) 仅在发出 N 个 http 请求后才开始发出响应,其中 N 等于最大连接数。
import scala.util.Failure
import scala.util.Success
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.Uri.apply
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object ConnectionPoolExample extends App {
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val config = ConfigFactory.load()
val connectionPoolSettings = ConnectionPoolSettings(config).withMaxConnections(10)
lazy val poolClientFlow = Http().cachedHostConnectionPool[Unit]("akka.io", 80, connectionPoolSettings)
val fakeSource = Source.fromIterator[Unit] { () => Iterator.continually { Thread.sleep(1000); () } }
val requests = fakeSource.map { _ => println("Creating request"); HttpRequest(uri = "/") -> (()) }
val responses = requests.via(poolClientFlow)
responses.runForeach {
case (tryResponse, jsonData) =>
tryResponse match {
case Success(httpResponse) =>
httpResponse.entity.dataBytes.runWith(Sink.ignore)
println(s"status: ${httpResponse.status}")
case Failure(e) => {
println(e)
}
}
}
}
输出如下所示:
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
status: 200 OK
Creating request
status: 200 OK
Creating request
status: 200 OK
...
我找不到任何允许在响应准备就绪时发出响应的配置参数,而不是在池没有可用连接时发出响应。
谢谢!
原因是您通过调用 Thread.sleep
来阻止客户端执行其他工作——该方法在反应式程序中是被禁止的。正确且更简单的方法是使用 Source.tick
.