Scala 使用 Futures 进行并行网络调用
Scala making parallel network calls using Futures
我是 Scala 的新手,我有一个方法,它从给定的文件列表中读取数据并 api 调用
数据,并将响应写入文件。
listOfFiles.map { file =>
val bufferedSource = Source.fromFile(file)
val data = bufferedSource.mkString
bufferedSource.close()
val response = doApiCall(data) // time consuming task
if (response.nonEmpty) writeFile(response, outputLocation)
}
上面的方法,在网络调用的时候太费时间了,所以尝试使用并行
处理以减少时间。
所以我尝试包装代码块,这会消耗更多时间,但程序很快结束
并且它没有生成任何输出,如上面的代码。
import scala.concurrent.ExecutionContext.Implicits.global
listOfFiles.map { file =>
val bufferedSource = Source.fromFile(file)
val data = bufferedSource.mkString
bufferedSource.close()
Future {
val response = doApiCall(data) // time consuming task
if (response.nonEmpty) writeFile(response, outputLocation)
}
}
如果您有任何建议,这将很有帮助。
(我也试过使用 "par",效果很好,
我正在探索 'par' 以外的其他选项,并使用 'akka'、'cats' 等框架)
基于而不是使用包含守护线程的默认执行上下文
import scala.concurrent.ExecutionContext.Implicits.global
使用非守护线程定义执行上下文
implicit val nonDeamonEc = ExecutionContext.fromExecutor(Executors.newCachedThreadPool)
您也可以像这样使用 Future.traverse
和 Await
val resultF = Future.traverse(listOfFiles) { file =>
val bufferedSource = Source.fromFile(file)
val data = bufferedSource.mkString
bufferedSource.close()
Future {
val response = doApiCall(data) // time consuming task
if (response.nonEmpty) writeFile(response, outputLocation)
}
}
Await.result(resultF, Duration.Inf)
traverse
将 List[Future[A]]
转换为 Future[List[A]]
。
我是 Scala 的新手,我有一个方法,它从给定的文件列表中读取数据并 api 调用 数据,并将响应写入文件。
listOfFiles.map { file =>
val bufferedSource = Source.fromFile(file)
val data = bufferedSource.mkString
bufferedSource.close()
val response = doApiCall(data) // time consuming task
if (response.nonEmpty) writeFile(response, outputLocation)
}
上面的方法,在网络调用的时候太费时间了,所以尝试使用并行 处理以减少时间。
所以我尝试包装代码块,这会消耗更多时间,但程序很快结束 并且它没有生成任何输出,如上面的代码。
import scala.concurrent.ExecutionContext.Implicits.global
listOfFiles.map { file =>
val bufferedSource = Source.fromFile(file)
val data = bufferedSource.mkString
bufferedSource.close()
Future {
val response = doApiCall(data) // time consuming task
if (response.nonEmpty) writeFile(response, outputLocation)
}
}
如果您有任何建议,这将很有帮助。 (我也试过使用 "par",效果很好, 我正在探索 'par' 以外的其他选项,并使用 'akka'、'cats' 等框架)
基于
import scala.concurrent.ExecutionContext.Implicits.global
使用非守护线程定义执行上下文
implicit val nonDeamonEc = ExecutionContext.fromExecutor(Executors.newCachedThreadPool)
您也可以像这样使用 Future.traverse
和 Await
val resultF = Future.traverse(listOfFiles) { file =>
val bufferedSource = Source.fromFile(file)
val data = bufferedSource.mkString
bufferedSource.close()
Future {
val response = doApiCall(data) // time consuming task
if (response.nonEmpty) writeFile(response, outputLocation)
}
}
Await.result(resultF, Duration.Inf)
traverse
将 List[Future[A]]
转换为 Future[List[A]]
。