发送多个 singleRequest 时出现 Akka BufferOverflowException
Akka BufferOverflowException when send many singleRequest
这是我的代码
Future.sequence((2 to firstPage.pages).map
{ count =>
getCommentPage(av, count)
}).map(//do something)
在getCommentPage
中,一个Http().singleRequest
用来获取数据,像这样:
val responseFuture: Future[HttpResponse] =
Http(system).singleRequest(HttpRequest(GET, uri = requestUri))
responseFuture
.map(_.entity)
.flatMap(_.toStrict(10 seconds)(materializer))
.map(_.data)
.map(_.utf8String)
.map((jsonString: String) => {
//do something to extract data
}
小的firstPage.pages
会很好,但是当firstPage.pages
大的时候(大约50或者更大),就会出现异常:
akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [32]. This means that the request queue of this pool (HostConnectionPoolSetup(api.bilibili.cn,80,ConnectionPoolSetup(ConnectionPoolSettings(4,0,5,32,1,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.0.9),10 seconds,1 minute,512,None,<function0>,List(),ParserSettings(2048,16,64,64,8192,64,8388608,256,1048576,Strict,RFC6265,true,Full,Error,Map(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, Content-MD5 -> 0, Date -> 0, If-Match -> 0, If-None-Match -> 0, User-Agent -> 32),false,akka.stream.impl.ConstantFun$$$Lambda4/19208387@4780bf,akka.stream.impl.ConstantFun$$$Lambda4/19208387@4780bf,akka.stream.impl.ConstantFun$$$Lambda5/6903324@1d25a2e),None),TCPTransport),akka.http.scaladsl.HttpConnectionContext$@796a3e,akka.event.MarkerLoggingAdapter@1cc552a))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
如何解决?
为什么不转到错误消息中的网站?
http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html
您需要将 application.conf 配置为 akka.http.host-connection-pool.max-connections
并将数字从 32 增加到更高的数字。
你要找的东西是专用的host connection pool
简化后的代码如下:
val poolClientFlow =
Http().cachedHostConnectionPool[HttpRequest](host, port)
def performRequest(request: HttpRequest): Future[HttpResponse] =
Source
.single(request)
.via(poolClientFlow)
.mapAsync(1) {
case (response, _) =>
Future.fromTry(response)
}
.runWith(Sink.head)
确保你会打电话给
response.discardEntityBytes()
或者只是解组它以防止资源泄漏。
他们在文档中警告说这是一种反模式,但如果您有足够的内存并且不需要任何队列管理,它实际上运行良好。
你可以试试
val result = Source(1 to 10).mapAsyncUnordered(parallelism = 5) { count =>
getCommentPage(av, count)
}.runWith(Sink.seq)
注意并行度值应小于akka.http.host-connection-pool.max-connections
这是我的代码
Future.sequence((2 to firstPage.pages).map
{ count =>
getCommentPage(av, count)
}).map(//do something)
在getCommentPage
中,一个Http().singleRequest
用来获取数据,像这样:
val responseFuture: Future[HttpResponse] =
Http(system).singleRequest(HttpRequest(GET, uri = requestUri))
responseFuture
.map(_.entity)
.flatMap(_.toStrict(10 seconds)(materializer))
.map(_.data)
.map(_.utf8String)
.map((jsonString: String) => {
//do something to extract data
}
小的firstPage.pages
会很好,但是当firstPage.pages
大的时候(大约50或者更大),就会出现异常:
akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [32]. This means that the request queue of this pool (HostConnectionPoolSetup(api.bilibili.cn,80,ConnectionPoolSetup(ConnectionPoolSettings(4,0,5,32,1,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.0.9),10 seconds,1 minute,512,None,<function0>,List(),ParserSettings(2048,16,64,64,8192,64,8388608,256,1048576,Strict,RFC6265,true,Full,Error,Map(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, Content-MD5 -> 0, Date -> 0, If-Match -> 0, If-None-Match -> 0, User-Agent -> 32),false,akka.stream.impl.ConstantFun$$$Lambda4/19208387@4780bf,akka.stream.impl.ConstantFun$$$Lambda4/19208387@4780bf,akka.stream.impl.ConstantFun$$$Lambda5/6903324@1d25a2e),None),TCPTransport),akka.http.scaladsl.HttpConnectionContext$@796a3e,akka.event.MarkerLoggingAdapter@1cc552a))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
如何解决?
为什么不转到错误消息中的网站?
http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html
您需要将 application.conf 配置为 akka.http.host-connection-pool.max-connections
并将数字从 32 增加到更高的数字。
你要找的东西是专用的host connection pool 简化后的代码如下:
val poolClientFlow =
Http().cachedHostConnectionPool[HttpRequest](host, port)
def performRequest(request: HttpRequest): Future[HttpResponse] =
Source
.single(request)
.via(poolClientFlow)
.mapAsync(1) {
case (response, _) =>
Future.fromTry(response)
}
.runWith(Sink.head)
确保你会打电话给
response.discardEntityBytes()
或者只是解组它以防止资源泄漏。
他们在文档中警告说这是一种反模式,但如果您有足够的内存并且不需要任何队列管理,它实际上运行良好。
你可以试试
val result = Source(1 to 10).mapAsyncUnordered(parallelism = 5) { count =>
getCommentPage(av, count)
}.runWith(Sink.seq)
注意并行度值应小于akka.http.host-connection-pool.max-connections