Akka HTTP 客户端和 Akka actor 性能调优
Akka HTTP client and Akka actor performance tuning
我们从使用 Camel HTTP4 转移到 Akka HTTP,虽然我们现在能够更好地控制错误,但考虑到 Akka HTTP(客户端)中的所有可调整参数,要获得更好的性能真的很难。
我们有一个 actor,它接收消息,向外部服务发出 HTTP GET 请求(能够轻松管理超过 1500 RPS),然后用字符串形式的 http 响应正文进行响应。
我们现在的上限是 650 RPS,即使我们没有收到错误(就像之前使用 Camel 的 HTTP4 一样),我们也不能超过这 650 个(与之前使用默认参数的 HTTP4 的 800 RPS 不同)。
我们的 HTTP 请求是使用 singleRequest 发出的:
val httpResponseFuture: Future[HttpResponse] = http.singleRequest(HttpRequest(uri = uri))
val tokenizationResponse = for {
response <- httpResponseFuture
body <- Unmarshal(response.entity).to[String]
} yield transformResponse(response.status, body, path, response.headers)
然后这些是产生最佳结果的设置(检查这些数字并没有显示任何真正的改进:
akka {
actor.deployment {
/HttpClient {
router = balancing-pool
nr-of-instances = 7
}
}
http {
host-connection-pool {
max-connections = 30
max-retries = 5
max-open-requests = 8192
pipelining-limit = 200
idle-timeout = 30 s
}
}
}
我们尝试调整池的大小、actor 实例以及主机连接池下的所有其他参数,但我们无法获得更好的结果。
欢迎提出任何建议!
不要混搭并发
大概您的查询功能只是向 Actor
发送消息并等待响应:
//what your code may look like now
object Message
val queryActorRef : ActorRef = ???
val responseBody : Future[String] = (queryActorRef ? Message).mapTo[String]
但这是不必要的。在此用例中使用 Actor
的唯一原因是保护有限的资源。但是底层的 http 连接池会为你处理资源利用。删除 Actor 中介将允许您单独使用 Futures:
val entityTimeout : FiniteDuration = 10.seconds
val responseBodyWithoutAnActor : Future[String] =
http
.singleRequest(HttpRequest(uri = uri))
.flatMap(response => response.entity.toStrict(timeout))
.map(_.data.utf8String)
流
如果发送给 Actor 的 "messages" 有底层来源,例如一个 Iterable
,那么你可以改用流式传输:
type Message = ???
val messagesSource : Iterable[Message] = ???
val uri : String = ???
val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]](uri)
val entityParallelism = 10
Source
.apply(messagesSource)
.via(poolClientFlow)
.mapAsync(entityParallelism)(resp.entity.toStrict(entityTimeout).data.utf8String)
.runForeach { responseBody : String =>
//whatever you do with the bodies
}
我们从使用 Camel HTTP4 转移到 Akka HTTP,虽然我们现在能够更好地控制错误,但考虑到 Akka HTTP(客户端)中的所有可调整参数,要获得更好的性能真的很难。
我们有一个 actor,它接收消息,向外部服务发出 HTTP GET 请求(能够轻松管理超过 1500 RPS),然后用字符串形式的 http 响应正文进行响应。
我们现在的上限是 650 RPS,即使我们没有收到错误(就像之前使用 Camel 的 HTTP4 一样),我们也不能超过这 650 个(与之前使用默认参数的 HTTP4 的 800 RPS 不同)。
我们的 HTTP 请求是使用 singleRequest 发出的:
val httpResponseFuture: Future[HttpResponse] = http.singleRequest(HttpRequest(uri = uri))
val tokenizationResponse = for {
response <- httpResponseFuture
body <- Unmarshal(response.entity).to[String]
} yield transformResponse(response.status, body, path, response.headers)
然后这些是产生最佳结果的设置(检查这些数字并没有显示任何真正的改进:
akka {
actor.deployment {
/HttpClient {
router = balancing-pool
nr-of-instances = 7
}
}
http {
host-connection-pool {
max-connections = 30
max-retries = 5
max-open-requests = 8192
pipelining-limit = 200
idle-timeout = 30 s
}
}
}
我们尝试调整池的大小、actor 实例以及主机连接池下的所有其他参数,但我们无法获得更好的结果。
欢迎提出任何建议!
不要混搭并发
大概您的查询功能只是向 Actor
发送消息并等待响应:
//what your code may look like now
object Message
val queryActorRef : ActorRef = ???
val responseBody : Future[String] = (queryActorRef ? Message).mapTo[String]
但这是不必要的。在此用例中使用 Actor
的唯一原因是保护有限的资源。但是底层的 http 连接池会为你处理资源利用。删除 Actor 中介将允许您单独使用 Futures:
val entityTimeout : FiniteDuration = 10.seconds
val responseBodyWithoutAnActor : Future[String] =
http
.singleRequest(HttpRequest(uri = uri))
.flatMap(response => response.entity.toStrict(timeout))
.map(_.data.utf8String)
流
如果发送给 Actor 的 "messages" 有底层来源,例如一个 Iterable
,那么你可以改用流式传输:
type Message = ???
val messagesSource : Iterable[Message] = ???
val uri : String = ???
val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]](uri)
val entityParallelism = 10
Source
.apply(messagesSource)
.via(poolClientFlow)
.mapAsync(entityParallelism)(resp.entity.toStrict(entityTimeout).data.utf8String)
.runForeach { responseBody : String =>
//whatever you do with the bodies
}