play.api.libs.ws 的批处理请求
Batching requests with play.api.libs.ws
我有一个脚本可以发出大量网络请求 (~300000)。看起来像这样
// Setup a new wsClient
val config = new NingAsyncHttpClientConfigBuilder(DefaultWSClientConfig()).build
val builder = new AsyncHttpClientConfig.Builder(config)
val wsClient = new NingWSClient(builder.build)
// Each of these use the wsClient
def getAs: Future[Seq[A]] = { ... }
def getBs: Future[Seq[B]] = { ... }
def getCs: Future[Seq[C]] = { ... }
def getDs: Future[Seq[D]] = { ... }
(for {
as <- getAs
bs <- getBs
cs <- getCs
ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))
问题是我会 运行 进入 Too many open files
异常,因为每个函数都异步发出数千个请求,每个请求都使用一个文件描述符。
我尝试重新组织我的函数,以便每个函数都可以与自己的客户进行批处理:
def getAs: Future[Seq[A]] = {
someCollection.group(1000).map(batch => {
val client = new NingWSClient(builder.build) // Make a new client for every batch
Future.sequence(batch.map(thing => {
wsClient.url(...).map(...)
})).map(things => {
wsClient.close // Close the client
things
})
})
}
但这会导致 for-comprehension 提前结束(没有任何错误消息或异常):
(for {
as <- getAs
bs <- getBs // This doesn't happen
cs <- getCs // Or any of the following ones
ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))
我只是在寻找无需打开太多文件描述符即可发出大量 http 请求的正确方法。
我有一个类似的问题,对一个网络服务的请求太多(~500+)。
您的分组代码示例几乎是正确的,但是,您将得到 Iterator[Future[List[Int]]]
或者如果您 Future.sequence
-d 它 Future[Iterator[List[Int]]]
。但是,我认为他们 all 将 运行 异步。您需要触发第一批,然后 flatMap
它(等到它完成),然后再触发下一批。这是我在 this answer:
之后设法写的
val futureIterator = list.grouped(50).foldLeft(Future.successful[List[Int]](Nil)) {
(fItems, items) =>
fItems flatMap { processed =>
println("PROCESSED: " + processed); println("SPAWNED: " + items);
Future.traverse(items)(getFuture) map (res => processed ::: res)
}
}
println(Await.result(futureIterator, Duration.Inf))
希望对您有所帮助!
您可以使用 Octoparts:
https://m3dev.github.io/octoparts/
但听起来您确实想要反转模式,因此 wsClient 在外部进行调用,然后平面映射 Future[WSResponse] 返回。这将限制 AsyncHttpClient 使用的内部 Netty 线程池的期货数量,您可以更改配置设置以增加或减少 Netty 通道池中的线程数量。
我有一个脚本可以发出大量网络请求 (~300000)。看起来像这样
// Setup a new wsClient
val config = new NingAsyncHttpClientConfigBuilder(DefaultWSClientConfig()).build
val builder = new AsyncHttpClientConfig.Builder(config)
val wsClient = new NingWSClient(builder.build)
// Each of these use the wsClient
def getAs: Future[Seq[A]] = { ... }
def getBs: Future[Seq[B]] = { ... }
def getCs: Future[Seq[C]] = { ... }
def getDs: Future[Seq[D]] = { ... }
(for {
as <- getAs
bs <- getBs
cs <- getCs
ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))
问题是我会 运行 进入 Too many open files
异常,因为每个函数都异步发出数千个请求,每个请求都使用一个文件描述符。
我尝试重新组织我的函数,以便每个函数都可以与自己的客户进行批处理:
def getAs: Future[Seq[A]] = {
someCollection.group(1000).map(batch => {
val client = new NingWSClient(builder.build) // Make a new client for every batch
Future.sequence(batch.map(thing => {
wsClient.url(...).map(...)
})).map(things => {
wsClient.close // Close the client
things
})
})
}
但这会导致 for-comprehension 提前结束(没有任何错误消息或异常):
(for {
as <- getAs
bs <- getBs // This doesn't happen
cs <- getCs // Or any of the following ones
ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))
我只是在寻找无需打开太多文件描述符即可发出大量 http 请求的正确方法。
我有一个类似的问题,对一个网络服务的请求太多(~500+)。
您的分组代码示例几乎是正确的,但是,您将得到 Iterator[Future[List[Int]]]
或者如果您 Future.sequence
-d 它 Future[Iterator[List[Int]]]
。但是,我认为他们 all 将 运行 异步。您需要触发第一批,然后 flatMap
它(等到它完成),然后再触发下一批。这是我在 this answer:
val futureIterator = list.grouped(50).foldLeft(Future.successful[List[Int]](Nil)) {
(fItems, items) =>
fItems flatMap { processed =>
println("PROCESSED: " + processed); println("SPAWNED: " + items);
Future.traverse(items)(getFuture) map (res => processed ::: res)
}
}
println(Await.result(futureIterator, Duration.Inf))
希望对您有所帮助!
您可以使用 Octoparts:
https://m3dev.github.io/octoparts/
但听起来您确实想要反转模式,因此 wsClient 在外部进行调用,然后平面映射 Future[WSResponse] 返回。这将限制 AsyncHttpClient 使用的内部 Netty 线程池的期货数量,您可以更改配置设置以增加或减少 Netty 通道池中的线程数量。