如何根据请求参数使用 Flows 在 akka-http 中引入非阻塞延迟

How to introduce a non blocking delay in akka-http using Flows based on the request parameters

我正在启动服务器并使用 akka stream 像这样处理它

connection.handleWith(handleRequest())

其中 handleRequest():Flow[HttpRequest,HttpRespnse,_] 我需要根据查询参数延迟将响应发送回客户端。我可以提取查询参数,但我不知道如何使用它来创建延迟。

看看 DelayFlow

https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/DelayFlow.scala

https://developer.lightbend.com/docs/api/akka-stream-contrib/current/akka/stream/contrib/DelayFlow.html

它应该允许您在流处理中引入延迟。

扩展@KnowsNotMuch 答案,假设您有一些标准来决定是否延迟请求:

val shouldDelayRequest : (HttpRequest) => Boolean = ???

您可以使用此决策器创建 DelayStrategy:

import scala.concurrent.duration.{FiniteDuration, TimeUnit}

val noDelay : FiniteDuration = FiniteDuration(0L, TimeUnit.SECONDS)

val createDelayStrategy : (FiniteDuration) => () => DelayStrategy[HttpRequest] = 
  (finiteDelay) => () => new DelayStrategy[HttpRequest] {
    override def nextDelay(elem: HttpRequest) : FiniteDuration = 
      if(shouldDelayRequest(elem))
        finiteDelay
      else
        noDelay
  }

您可以使用此函数创建 DelayFlow:

import akka.stream.contrib.DelayFlow

val delay = FiniteDuration(42L, TimeUnit.Seconds)

val delayFlow : DelayFlow[HttpRequest] = DelayFlow(createDelayStrategy(delay))

delayFlow 然后可以连接到将请求处理为响应所需的任何功能:

val requestToResponseFlow : Flow[HttpRequest, HttpResponse, _] = ???

val possibleDelayedResponseFlow  : Flow[HttpRequest, HttpResponse, _] = 
  delayFlow via requestToResponseFlow