Scala:在 Map 值中重复执行方法
Scala: duplicate execution of method in Map values
我已经实现了一个 Scala 函数,该函数使用来自 Apache HttpComponents 的 HttpClient v4.5.3 执行 HTTP 请求。这个想法是解析响应并检查它是否正常(即 return 代码 200)。如果不是,请重试n次。
当响应正常时,一切正常。
我可以从记录器输出中看到,如果一开始出现 HTTP 错误,也会按预期执行重试。但是,当它失败n次时,似乎又重新开始了整个过程。
这是执行请求的函数:
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpUriRequest}
[...]
class SearchQuery {
[...]
def executeRequest(retries: Int = 2, delay: Int = 1): Option[SearchResponse] = {
val request: HttpUriRequest = makeRequest
val response: CloseableHttpResponse = config.httpClient.execute(request)
val searchResults: Option[SearchResponse] = parseResponse(response)
response.close()
if (searchResults.isEmpty && retries > 0) {
logger.warn(s"Failed to retrieve response from ${source}. " +
s"Retrying $retries more time(s) in $delay second(s)...")
Thread.sleep(delay * 1000)
executeRequest(retries - 1, delay)
}
else searchResults
}
[...]
}
方法 makeRequest()
生成 HTTP 请求。 config.httpClient
提供可重复使用的 HttpClient
实例。
除了递归中的调用外,executeRequests()
的调用如下所示。这个想法是拥有一个或多个服务器(源),由调用者定义,并行执行调用;它们以 enum
类型 Source
定义。 queryFromfile()
读取给定文件的内容以生成 SearchQuery
对象,然后从中调用 executeRequest
。
val sources: Set[Source] = Set(Source.1)
val file: File = ...
val responses: parallel.ParMap[Source, Option[SearchResponse]] = sources
.par
.map(source => (source, SearchQuery.queryFromFile(file, source)))
.toMap
.mapValues(query => query.executeRequest())
在此示例中,executeRequest
应该为每个来源调用一次。一个源通过良好,另一个预计在这里失败。
来自日志:
14:56:48.656 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 2 more time(s) in 3 second(s)...
14:57:07.136 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 1 more time(s) in 3 second(s)...
14:57:25.538 [ScalaTest-run-running-FileProcessorTest] ERROR process.FileProcessor - Failed to retrieve results for file 'XXX' from sources: source1. Continuing.
14:57:40.933 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 2 more time(s) in 3 second(s)...
14:57:59.214 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 1 more time(s) in 3 second(s)...
使用 IntelliJ 调试器,逻辑看起来也和最初的预期一样,在 n 次尝试后在 else searchResults
行结束。但是,执行器实际上并没有 return 处理结果,而是跳回 if
子句,进入递归调用 executeRequest(retries - 1, delay)
.
更新:
我发现此行为是由 responses
val:
上的后续操作引起的
val emptyResponses = responses.filter(_._2.isEmpty)
if (emptyResponses.nonEmpty)
logger.error(
s"Failed to retrieve results for '$fileName' from sources: " +
s"${emptyResponses.keys.mkString(",")}. Continuing."
)
在这里,我检查是否有任何调用失败,如果失败则记录错误。我不明白为什么这会触发 executeRequest()
第二次调用。为什么会这样?
此外,下行不会导致对 executeRequest()
的第二次调用,即使它在概念上看起来非常相似:
responses.values
.filter(response => response.isDefined)
.map(response => response.get)
之所以在访问值时再次执行executeResponse()
是因为每次访问Scala Map的值都会重新计算。
responses
因此需要建模为元组 (source, Option[SearchResponse]
:
val responses = sources
.par
.map(source => (source, SearchQuery.queryFromFile(file, source)))
.map(pair => (pair._1, pair._2.executeRequest()))
这基本上消除了 mapValues
的简洁性,但导致元组中第二个元素的值固定。
我已经实现了一个 Scala 函数,该函数使用来自 Apache HttpComponents 的 HttpClient v4.5.3 执行 HTTP 请求。这个想法是解析响应并检查它是否正常(即 return 代码 200)。如果不是,请重试n次。
当响应正常时,一切正常。 我可以从记录器输出中看到,如果一开始出现 HTTP 错误,也会按预期执行重试。但是,当它失败n次时,似乎又重新开始了整个过程。
这是执行请求的函数:
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpUriRequest}
[...]
class SearchQuery {
[...]
def executeRequest(retries: Int = 2, delay: Int = 1): Option[SearchResponse] = {
val request: HttpUriRequest = makeRequest
val response: CloseableHttpResponse = config.httpClient.execute(request)
val searchResults: Option[SearchResponse] = parseResponse(response)
response.close()
if (searchResults.isEmpty && retries > 0) {
logger.warn(s"Failed to retrieve response from ${source}. " +
s"Retrying $retries more time(s) in $delay second(s)...")
Thread.sleep(delay * 1000)
executeRequest(retries - 1, delay)
}
else searchResults
}
[...]
}
方法 makeRequest()
生成 HTTP 请求。 config.httpClient
提供可重复使用的 HttpClient
实例。
除了递归中的调用外,executeRequests()
的调用如下所示。这个想法是拥有一个或多个服务器(源),由调用者定义,并行执行调用;它们以 enum
类型 Source
定义。 queryFromfile()
读取给定文件的内容以生成 SearchQuery
对象,然后从中调用 executeRequest
。
val sources: Set[Source] = Set(Source.1)
val file: File = ...
val responses: parallel.ParMap[Source, Option[SearchResponse]] = sources
.par
.map(source => (source, SearchQuery.queryFromFile(file, source)))
.toMap
.mapValues(query => query.executeRequest())
在此示例中,executeRequest
应该为每个来源调用一次。一个源通过良好,另一个预计在这里失败。
来自日志:
14:56:48.656 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 2 more time(s) in 3 second(s)...
14:57:07.136 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 1 more time(s) in 3 second(s)...
14:57:25.538 [ScalaTest-run-running-FileProcessorTest] ERROR process.FileProcessor - Failed to retrieve results for file 'XXX' from sources: source1. Continuing.
14:57:40.933 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 2 more time(s) in 3 second(s)...
14:57:59.214 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 1 more time(s) in 3 second(s)...
使用 IntelliJ 调试器,逻辑看起来也和最初的预期一样,在 n 次尝试后在 else searchResults
行结束。但是,执行器实际上并没有 return 处理结果,而是跳回 if
子句,进入递归调用 executeRequest(retries - 1, delay)
.
更新:
我发现此行为是由 responses
val:
val emptyResponses = responses.filter(_._2.isEmpty)
if (emptyResponses.nonEmpty)
logger.error(
s"Failed to retrieve results for '$fileName' from sources: " +
s"${emptyResponses.keys.mkString(",")}. Continuing."
)
在这里,我检查是否有任何调用失败,如果失败则记录错误。我不明白为什么这会触发 executeRequest()
第二次调用。为什么会这样?
此外,下行不会导致对 executeRequest()
的第二次调用,即使它在概念上看起来非常相似:
responses.values
.filter(response => response.isDefined)
.map(response => response.get)
之所以在访问值时再次执行executeResponse()
是因为每次访问Scala Map的值都会重新计算。
responses
因此需要建模为元组 (source, Option[SearchResponse]
:
val responses = sources
.par
.map(source => (source, SearchQuery.queryFromFile(file, source)))
.map(pair => (pair._1, pair._2.executeRequest()))
这基本上消除了 mapValues
的简洁性,但导致元组中第二个元素的值固定。