如何使用 Akka HTTP 从多个参与者/网络处理程序正确调用单个服务器?
How to properly call a single server from multiple actors / web handlers using Akka HTTP?
我有一个服务(我们称之为服务 A),它使用 Akka Server HTTP 来处理传入的请求。
我还有提供多种 Web 服务的第 3 方应用程序(服务 B)。
服务 A 的目的是转换客户端请求,调用服务 B 的一个或多个 Web 服务,merge/transform 结果并将其返回给客户端。
我在某些部分使用 Actors,而在其他部分使用 Future。
要调用服务 B,我使用 Akka HTTP 客户端。
Http.get(actorSystem).singleRequest(HttpRequest.create()
.withUri("http://127.0.0.1:8082/test"), materializer)
.onComplete(...)
问题是,每个服务 A 请求都会创建一个新流,如果有多个并发连接,则会导致 akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error
我已经问过这个问题并得到了使用单一流程的建议
虽然它适用于来自一个地方的一批请求,但我不知道如何使用来自所有并发请求处理程序的单个流程。
正确的"Akka-way"怎么做?
您需要做的就是在您的服务 A 代码中为服务 B 设置 HostConnectionPool。这将为您提供一个 Flow
,它可以添加到您的服务 A 流中,以使用连接池而不是每个流的新连接将请求从 A 分派到 B。来自文档:
As opposed to the Connection-Level Client-Side API the host-level API
relieves you from manually managing individual HTTP connections. It
autonomously manages a configurable pool of connections to one
particular target endpoint (i.e. host/port combination).
此流在不同流中的每个具体化都将从该底层连接池中提取:
The best way to get a hold of a connection pool to a given target
endpoint is the Http.get(system).cachedHostConnectionPool(...)
method,
which returns a Flow
that can be "baked" into an application-level
stream setup. This flow is also called a "pool client flow".
这是
的 Java 版本
final Flow<
Pair<HttpRequest, Promise<HttpResponse>>,
Pair<Try<HttpResponse>, Promise<HttpResponse>>,
NotUsed> flow =
Http.get(actorSystem).superPool(materializer);
final SourceQueue<Pair<HttpRequest, Promise<HttpResponse>>> queue = Source.<Pair<HttpRequest, Promise<HttpResponse>>>
queue(BUFFER_SIZE, OverflowStrategy.dropNew())
.via(flow)
.toMat(Sink.foreach(p -> p.second().complete(p.first())), Keep.left())
.run(materializer);
...
public CompletionStage<HttpResponse> request(HttpRequest request) {
log.debug("Making request {}", request);
Promise<HttpResponse> promise = Futures.promise();
return queue.offer(Pair.create(request, promise))
.thenCompose(buffered -> {
if (buffered instanceof QueueOfferResult.Enqueued$) {
return FutureConverters.toJava(promise.future())
.thenApply(resp -> {
if (log.isDebugEnabled()) {
log.debug("Got response {} {}", resp.status(), resp.getHeaders());
}
return resp;
});
} else {
log.error("Could not buffer request {}", request);
return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE));
}
});
}
我认为您可以使用 Source.queue
来缓冲您的请求。下面的代码假定您需要从 3rd 方服务获得答案,因此非常欢迎 Future[HttpResponse]
。这样你也可以提供一个溢出策略来防止资源匮乏。
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem("main")
implicit val materializer = ActorMaterializer()
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
.via(pool)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise
val response = queue.offer(request).flatMap(buffered => {
if (buffered) promise.future
else Future.failed(new RuntimeException())
})
Await.ready(response, 3 seconds)
(从我的 blog post 复制的代码)
我有一个服务(我们称之为服务 A),它使用 Akka Server HTTP 来处理传入的请求。 我还有提供多种 Web 服务的第 3 方应用程序(服务 B)。 服务 A 的目的是转换客户端请求,调用服务 B 的一个或多个 Web 服务,merge/transform 结果并将其返回给客户端。
我在某些部分使用 Actors,而在其他部分使用 Future。 要调用服务 B,我使用 Akka HTTP 客户端。
Http.get(actorSystem).singleRequest(HttpRequest.create()
.withUri("http://127.0.0.1:8082/test"), materializer)
.onComplete(...)
问题是,每个服务 A 请求都会创建一个新流,如果有多个并发连接,则会导致 akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error
我已经问过这个问题并得到了使用单一流程的建议
虽然它适用于来自一个地方的一批请求,但我不知道如何使用来自所有并发请求处理程序的单个流程。
正确的"Akka-way"怎么做?
您需要做的就是在您的服务 A 代码中为服务 B 设置 HostConnectionPool。这将为您提供一个 Flow
,它可以添加到您的服务 A 流中,以使用连接池而不是每个流的新连接将请求从 A 分派到 B。来自文档:
As opposed to the Connection-Level Client-Side API the host-level API relieves you from manually managing individual HTTP connections. It autonomously manages a configurable pool of connections to one particular target endpoint (i.e. host/port combination).
此流在不同流中的每个具体化都将从该底层连接池中提取:
The best way to get a hold of a connection pool to a given target endpoint is the
Http.get(system).cachedHostConnectionPool(...)
method, which returns aFlow
that can be "baked" into an application-level stream setup. This flow is also called a "pool client flow".
这是
final Flow<
Pair<HttpRequest, Promise<HttpResponse>>,
Pair<Try<HttpResponse>, Promise<HttpResponse>>,
NotUsed> flow =
Http.get(actorSystem).superPool(materializer);
final SourceQueue<Pair<HttpRequest, Promise<HttpResponse>>> queue = Source.<Pair<HttpRequest, Promise<HttpResponse>>>
queue(BUFFER_SIZE, OverflowStrategy.dropNew())
.via(flow)
.toMat(Sink.foreach(p -> p.second().complete(p.first())), Keep.left())
.run(materializer);
...
public CompletionStage<HttpResponse> request(HttpRequest request) {
log.debug("Making request {}", request);
Promise<HttpResponse> promise = Futures.promise();
return queue.offer(Pair.create(request, promise))
.thenCompose(buffered -> {
if (buffered instanceof QueueOfferResult.Enqueued$) {
return FutureConverters.toJava(promise.future())
.thenApply(resp -> {
if (log.isDebugEnabled()) {
log.debug("Got response {} {}", resp.status(), resp.getHeaders());
}
return resp;
});
} else {
log.error("Could not buffer request {}", request);
return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE));
}
});
}
我认为您可以使用 Source.queue
来缓冲您的请求。下面的代码假定您需要从 3rd 方服务获得答案,因此非常欢迎 Future[HttpResponse]
。这样你也可以提供一个溢出策略来防止资源匮乏。
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem("main")
implicit val materializer = ActorMaterializer()
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
.via(pool)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise
val response = queue.offer(request).flatMap(buffered => {
if (buffered) promise.future
else Future.failed(new RuntimeException())
})
Await.ready(response, 3 seconds)
(从我的 blog post 复制的代码)