如何使用 Akka-Http 进行并行 Http 请求?
How to make parallel Http request using Akka-Http?
我是 Scala 的新手,正在尝试实现一个我将获得数千个 URL 的库。我的工作是从那些 URL 中下载内容。我会选择简单的 scalaj-http
库,但它不符合我的目的。
我附带的代码是这样的:
class ProxyHttpClient {
def get(url: String, proxy: ProxySettings,urlDownloaderConfig:
UrlDownloaderConfig)(implicit ec: ExecutionContext): Either[HttpError,
HttpSuccessResponse] = {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
val auth = headers.BasicHttpCredentials(proxy.userName,
proxy.secret)
val httpsProxyTransport =
ClientTransport.httpsProxy(InetSocketAddress.createUnresolved(
proxy.host, proxy.port), auth)
val settings =
ConnectionPoolSettings(system).withTransport(httpsProxyTransport)
val response: Future[HttpResponse] =
Http().singleRequest(HttpRequest().
withMethod(HttpMethods.GET).withUri(url), settings = settings)
val data: Future[Either[HttpError, HttpSuccessResponse]] = `response.map {`
case response@HttpResponse(StatusCodes.OK, _, _, _) => {
val content: Future[String] = Unmarshal(response.entity).to[String]
val finalContent = Await.ready(content, timeToWaitForContent).value.get.get.getBytes
Right(HttpSuccessResponse(url, response.status.intValue(), finalContent))
}
case errorResponse@HttpResponse(StatusCodes.GatewayTimeout, _, _, _) => Left(HttpError(url, errorResponse.status.intValue(), errorResponse.entity.toString))
}
val result: Try[Either[HttpError, HttpSuccessResponse]] = Await.ready(data, timeToWaitForResponse).value.get
val pop: Either[HttpError, HttpSuccessResponse] = try {
result.get
} catch {
case e: Exception => Left(HttpError(url, HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage))
}
pop
}
}
为了调用 get
方法,我正在使用
val forkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool(8)
picList.par.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
picList.par.map(testUrl => {
val resp = get(url, Option(proxy))
})
它 运行 顺利几次,但是当我尝试调用 1000 个 url 的方法来获取批量大小为 100 的图像时,它抛出了以下错误。在那之后,即使对于单个 URL 我也会遇到同样的错误。
**java.lang.OutOfMemoryError: unable to create new native thread**
我应该在这里使用 actors 而不是 actorsystem 并为其专门分配一个单独的调度程序吗?
因为我保存的图像内容是二进制的,所以我是否必须在达到目的后将其从内存中删除?
代码片段会更有帮助。提前致谢
我尝试遵循人们建议使用的在线建议
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
但是当我尝试时,system.dispatchers.lookup
正在返回 MessageDispacther 类型。
implicit val system: ActorSystem = ActorSystem()
val ex: MessageDispatcher =system.dispatchers.lookup("io-blocking-dispatcher")
我是否缺少任何库或导入?
您的问题很可能与为每个 http 调用创建参与者系统有关。 Actor 系统通常是每个应用程序一个。
做一个小的重构并尝试一下。
class ProxyHttpClient() {
private implicit val system: ActorSystem = ActorSystem()
private implicit val materializer: ActorMaterializer = ActorMaterializer()
def get(url: String, proxy: ProxySettings,urlDownloaderConfig:
UrlDownloaderConfig)(implicit ec: ExecutionContext): Either[HttpError,
HttpSuccessResponse] = {???}
}
或者提取演员系统并将其作为隐式参数传递
class ProxyHttpClient() {
def get(url: String, proxy: ProxySettings,urlDownloaderConfig:
UrlDownloaderConfig)(implicit ec: ExecutionContext, system: ActorSystem, materializer: ActorMaterializer): Either[HttpError,
HttpSuccessResponse] = {???}
}
我是 Scala 的新手,正在尝试实现一个我将获得数千个 URL 的库。我的工作是从那些 URL 中下载内容。我会选择简单的 scalaj-http
库,但它不符合我的目的。
我附带的代码是这样的:
class ProxyHttpClient {
def get(url: String, proxy: ProxySettings,urlDownloaderConfig:
UrlDownloaderConfig)(implicit ec: ExecutionContext): Either[HttpError,
HttpSuccessResponse] = {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
val auth = headers.BasicHttpCredentials(proxy.userName,
proxy.secret)
val httpsProxyTransport =
ClientTransport.httpsProxy(InetSocketAddress.createUnresolved(
proxy.host, proxy.port), auth)
val settings =
ConnectionPoolSettings(system).withTransport(httpsProxyTransport)
val response: Future[HttpResponse] =
Http().singleRequest(HttpRequest().
withMethod(HttpMethods.GET).withUri(url), settings = settings)
val data: Future[Either[HttpError, HttpSuccessResponse]] = `response.map {`
case response@HttpResponse(StatusCodes.OK, _, _, _) => {
val content: Future[String] = Unmarshal(response.entity).to[String]
val finalContent = Await.ready(content, timeToWaitForContent).value.get.get.getBytes
Right(HttpSuccessResponse(url, response.status.intValue(), finalContent))
}
case errorResponse@HttpResponse(StatusCodes.GatewayTimeout, _, _, _) => Left(HttpError(url, errorResponse.status.intValue(), errorResponse.entity.toString))
}
val result: Try[Either[HttpError, HttpSuccessResponse]] = Await.ready(data, timeToWaitForResponse).value.get
val pop: Either[HttpError, HttpSuccessResponse] = try {
result.get
} catch {
case e: Exception => Left(HttpError(url, HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage))
}
pop
}
}
为了调用 get
方法,我正在使用
val forkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool(8)
picList.par.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
picList.par.map(testUrl => {
val resp = get(url, Option(proxy))
})
它 运行 顺利几次,但是当我尝试调用 1000 个 url 的方法来获取批量大小为 100 的图像时,它抛出了以下错误。在那之后,即使对于单个 URL 我也会遇到同样的错误。
**java.lang.OutOfMemoryError: unable to create new native thread**
我应该在这里使用 actors 而不是 actorsystem 并为其专门分配一个单独的调度程序吗?
因为我保存的图像内容是二进制的,所以我是否必须在达到目的后将其从内存中删除?
代码片段会更有帮助。提前致谢
我尝试遵循人们建议使用的在线建议
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
但是当我尝试时,system.dispatchers.lookup
正在返回 MessageDispacther 类型。
implicit val system: ActorSystem = ActorSystem()
val ex: MessageDispatcher =system.dispatchers.lookup("io-blocking-dispatcher")
我是否缺少任何库或导入?
您的问题很可能与为每个 http 调用创建参与者系统有关。 Actor 系统通常是每个应用程序一个。
做一个小的重构并尝试一下。
class ProxyHttpClient() {
private implicit val system: ActorSystem = ActorSystem()
private implicit val materializer: ActorMaterializer = ActorMaterializer()
def get(url: String, proxy: ProxySettings,urlDownloaderConfig:
UrlDownloaderConfig)(implicit ec: ExecutionContext): Either[HttpError,
HttpSuccessResponse] = {???}
}
或者提取演员系统并将其作为隐式参数传递
class ProxyHttpClient() {
def get(url: String, proxy: ProxySettings,urlDownloaderConfig:
UrlDownloaderConfig)(implicit ec: ExecutionContext, system: ActorSystem, materializer: ActorMaterializer): Either[HttpError,
HttpSuccessResponse] = {???}
}