如何用 Akka 延迟一秒来限制 Futures
How to throttle Futures with one-second delay with Akka
我有一个 URI 列表,我想请求每个 URI 之间有一秒的延迟。我该怎么做?
val uris: List[String] = List()
// How to make these URIs resolve 1 second apart?
val responses: List[Future[Response]] = uris.map(httpRequest(_))
可能是这样的:
@tailrec
def withDelay(
uris: Seq[String],
delay: Duration = 1 second,
result: List[Future[Response]] = Nil,
): Seq[Future[Response]] = uris match {
case Seq() => result.reversed
case (head, tail@_*) =>
val v = result.headOption.getOrElse(Future.successful(null))
.flatMap { _ =>
akka.pattern.after(delay, context.system.scheduler)(httpRequest(head))
}
withDelay(tail, delay, v :: result)
}
这在第一次执行之前也有延迟,但我希望,如果有必要,如何摆脱它已经足够清楚了......
另一个警告是,这假设所有期货都会成功。一旦失败,所有后续处理都会中止。
如果您需要不同的行为,您可能需要将 .flatMap
替换为 .transform
或添加 .recover
等
如果愿意,您也可以使用 .foldLeft
编写相同的内容:
uris.foldLeft(List.empty[Future[Response]]) { case (results, next) =>
results.headOption.getOrElse(Future.successful(null))
.flatMap { _ =>
akka.pattern.after(delay, context.system.scheduler)(httpRequest(next))
} :: results
}.reversed
akka 流有它 out of the box with the throttle
function(考虑到您正在使用 akka-http 并为 akka 流添加了标签)
您可以从 URI 列表创建 Akka Streams Source
,然后 throttle
将每个 URI 转换为 Future[Response]
:
def httpRequest(uri: String): Future[Response] = ???
val uris: List[String] = ???
val responses: Future[Seq[Response]] =
Source(uris)
.throttle(1, 1 second)
.mapAsync(parallelism = 1)(httpRequest)
.runWith(Sink.seq[Response])
我有一个 URI 列表,我想请求每个 URI 之间有一秒的延迟。我该怎么做?
val uris: List[String] = List()
// How to make these URIs resolve 1 second apart?
val responses: List[Future[Response]] = uris.map(httpRequest(_))
可能是这样的:
@tailrec
def withDelay(
uris: Seq[String],
delay: Duration = 1 second,
result: List[Future[Response]] = Nil,
): Seq[Future[Response]] = uris match {
case Seq() => result.reversed
case (head, tail@_*) =>
val v = result.headOption.getOrElse(Future.successful(null))
.flatMap { _ =>
akka.pattern.after(delay, context.system.scheduler)(httpRequest(head))
}
withDelay(tail, delay, v :: result)
}
这在第一次执行之前也有延迟,但我希望,如果有必要,如何摆脱它已经足够清楚了......
另一个警告是,这假设所有期货都会成功。一旦失败,所有后续处理都会中止。
如果您需要不同的行为,您可能需要将 .flatMap
替换为 .transform
或添加 .recover
等
如果愿意,您也可以使用 .foldLeft
编写相同的内容:
uris.foldLeft(List.empty[Future[Response]]) { case (results, next) =>
results.headOption.getOrElse(Future.successful(null))
.flatMap { _ =>
akka.pattern.after(delay, context.system.scheduler)(httpRequest(next))
} :: results
}.reversed
akka 流有它 out of the box with the throttle
function(考虑到您正在使用 akka-http 并为 akka 流添加了标签)
您可以从 URI 列表创建 Akka Streams Source
,然后 throttle
将每个 URI 转换为 Future[Response]
:
def httpRequest(uri: String): Future[Response] = ???
val uris: List[String] = ???
val responses: Future[Seq[Response]] =
Source(uris)
.throttle(1, 1 second)
.mapAsync(parallelism = 1)(httpRequest)
.runWith(Sink.seq[Response])