akka流http速率限制
akka stream http rate limit
我的计算图的一个阶段是类型流
Flow[Seq[Request], Seq[Response], NotUsed]
。
显然,这个阶段应该为每个请求分配一个响应,并在所有请求都得到解决后发出 seq。
现在,底层 API 具有严格的速率限制策略,因此我每秒只能触发一个请求。如果我有一个 Flow
的单个 Request
s,我可以 zip
这个流与一个每秒发出一个元素的流 (),但我没有在这种情况下看到类似的解决方案。
有什么好的表达方式吗?我想到的想法是使用低级 Graph DSL 并将一秒滴答流作为那里的状态,并使用它来处理请求序列,但我怀疑它会变得好看。
正如 Victor 所说,您可能应该使用默认油门。但是如果你想自己做它可能看起来像这样
private def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val ticker = Source.tick(rate, rate, Unit)
val zip = builder.add(Zip[T, Unit.type])
val map = Flow[(T, Unit.type)].map { case (value, _) => value }
val messageExtractor = builder.add(map)
ticker ~> zip.in1
zip.out ~> messageExtractor.in
FlowShape.of(zip.in0, messageExtractor.out)
})
// And it will be used in your flow as follows
// .via(throttleFlow(FiniteDuration(200, MILLISECONDS)))
此外,由于您限制了对某些 API 的访问,您可能希望以集中方式限制对它的调用。假设您的项目中有多个地方调用相同的外部 API,但是因为调用来自相同的 IP 限制应该应用于所有这些地方。对于这种情况,请考虑使用 MergeHub.source
作为您的(假定的)akka-http 流程。每个调用者将创建并执行新图来进行调用。
这是我最终使用的:
case class FlowItem[I](i: I, requests: Seq[HttpRequest], responses: Seq[String]) {
def withResponse(resp: String) = copy(responses = resp +: responses)
def extractNextRequest = (requests.head, copy(requests = requests.tail))
}
def apiFlow[I, O](requestPer: FiniteDuration,
buildRequests: I => Seq[HttpRequest],
buildOut: (I, Seq[String]) => O
)(implicit system: ActorSystem, materializer: ActorMaterializer) = {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val in: FlowShape[I, FlowItem[I]] =
b.add(Flow[I].map(i => FlowItem(i, buildRequests(i), Seq.empty)))
val merge: MergePreferredShape[FlowItem[I]] =
b.add(MergePreferred[FlowItem[I]](1))
val throttle: FlowShape[FlowItem[I], FlowItem[I]] =
b.add(Flow[FlowItem[I]].throttle(1, requestPer, 1, ThrottleMode.shaping))
val prepareRequest: FlowShape[FlowItem[I], (HttpRequest, FlowItem[I])] =
b.add(Flow[FlowItem[I]].map(_.extractNextRequest))
val log =
b.add(Flow[(HttpRequest, FlowItem[I])].map { r => Console.println(s"rquest to ${r._1.uri}"); r})
val pool: FlowShape[(HttpRequest, FlowItem[I]), (Try[HttpResponse], FlowItem[I])] =
b.add(Http(system).superPool[FlowItem[I]]())
val transformResponse: FlowShape[(Try[HttpResponse], FlowItem[I]), FlowItem[I]] =
b.add(Flow[(Try[HttpResponse], FlowItem[I])].mapAsync(1) {
case (Success(HttpResponse(StatusCodes.OK, headers, entity, _)), flowItem) =>
entity.toStrict(1.second).map(resp => flowItem.withResponse(resp.data.utf8String))
})
val split: UniformFanOutShape[FlowItem[I], FlowItem[I]] =
b.add(Partition[FlowItem[I]](2, fi => if (fi.requests.isEmpty) 0 else 1))
val out: FlowShape[FlowItem[I], O] =
b.add(Flow[FlowItem[I]].map(fi => buildOut(fi.i, fi.responses)))
in ~> merge ~> throttle ~> prepareRequest ~> log ~> pool ~> transformResponse ~> split ~> out
merge.preferred <~ split
FlowShape(in.in, out.out)
}
}
我们的想法是,只要有请求,就尽可能多地传递元素,并将额外的(尚未执行的)请求与消息一起存储。 split
元素检查是否还有剩余请求。
我的计算图的一个阶段是类型流
Flow[Seq[Request], Seq[Response], NotUsed]
。
显然,这个阶段应该为每个请求分配一个响应,并在所有请求都得到解决后发出 seq。
现在,底层 API 具有严格的速率限制策略,因此我每秒只能触发一个请求。如果我有一个 Flow
的单个 Request
s,我可以 zip
这个流与一个每秒发出一个元素的流 (
有什么好的表达方式吗?我想到的想法是使用低级 Graph DSL 并将一秒滴答流作为那里的状态,并使用它来处理请求序列,但我怀疑它会变得好看。
正如 Victor 所说,您可能应该使用默认油门。但是如果你想自己做它可能看起来像这样
private def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val ticker = Source.tick(rate, rate, Unit)
val zip = builder.add(Zip[T, Unit.type])
val map = Flow[(T, Unit.type)].map { case (value, _) => value }
val messageExtractor = builder.add(map)
ticker ~> zip.in1
zip.out ~> messageExtractor.in
FlowShape.of(zip.in0, messageExtractor.out)
})
// And it will be used in your flow as follows
// .via(throttleFlow(FiniteDuration(200, MILLISECONDS)))
此外,由于您限制了对某些 API 的访问,您可能希望以集中方式限制对它的调用。假设您的项目中有多个地方调用相同的外部 API,但是因为调用来自相同的 IP 限制应该应用于所有这些地方。对于这种情况,请考虑使用 MergeHub.source
作为您的(假定的)akka-http 流程。每个调用者将创建并执行新图来进行调用。
这是我最终使用的:
case class FlowItem[I](i: I, requests: Seq[HttpRequest], responses: Seq[String]) {
def withResponse(resp: String) = copy(responses = resp +: responses)
def extractNextRequest = (requests.head, copy(requests = requests.tail))
}
def apiFlow[I, O](requestPer: FiniteDuration,
buildRequests: I => Seq[HttpRequest],
buildOut: (I, Seq[String]) => O
)(implicit system: ActorSystem, materializer: ActorMaterializer) = {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val in: FlowShape[I, FlowItem[I]] =
b.add(Flow[I].map(i => FlowItem(i, buildRequests(i), Seq.empty)))
val merge: MergePreferredShape[FlowItem[I]] =
b.add(MergePreferred[FlowItem[I]](1))
val throttle: FlowShape[FlowItem[I], FlowItem[I]] =
b.add(Flow[FlowItem[I]].throttle(1, requestPer, 1, ThrottleMode.shaping))
val prepareRequest: FlowShape[FlowItem[I], (HttpRequest, FlowItem[I])] =
b.add(Flow[FlowItem[I]].map(_.extractNextRequest))
val log =
b.add(Flow[(HttpRequest, FlowItem[I])].map { r => Console.println(s"rquest to ${r._1.uri}"); r})
val pool: FlowShape[(HttpRequest, FlowItem[I]), (Try[HttpResponse], FlowItem[I])] =
b.add(Http(system).superPool[FlowItem[I]]())
val transformResponse: FlowShape[(Try[HttpResponse], FlowItem[I]), FlowItem[I]] =
b.add(Flow[(Try[HttpResponse], FlowItem[I])].mapAsync(1) {
case (Success(HttpResponse(StatusCodes.OK, headers, entity, _)), flowItem) =>
entity.toStrict(1.second).map(resp => flowItem.withResponse(resp.data.utf8String))
})
val split: UniformFanOutShape[FlowItem[I], FlowItem[I]] =
b.add(Partition[FlowItem[I]](2, fi => if (fi.requests.isEmpty) 0 else 1))
val out: FlowShape[FlowItem[I], O] =
b.add(Flow[FlowItem[I]].map(fi => buildOut(fi.i, fi.responses)))
in ~> merge ~> throttle ~> prepareRequest ~> log ~> pool ~> transformResponse ~> split ~> out
merge.preferred <~ split
FlowShape(in.in, out.out)
}
}
我们的想法是,只要有请求,就尽可能多地传递元素,并将额外的(尚未执行的)请求与消息一起存储。 split
元素检查是否还有剩余请求。