Akka Stream + Akka Http - 获取错误请求

Akka Stream + Akka Http - Get Request on Error

我有以下流效果很好:

source
  .map(x => HttpRequest(uri = x.rawRequest))
  .via(Http().outgoingConnection(host, port))
  .to(Sink.actorRef(myActor, IsDone))
  .run()

和一个简单的 actor 来处理流完成时的响应状态和最终消息:

/**
  * A simple actor to count how many rows have been processed
  * in the complete process given a http status
  *
  * It also finish the main thread upon a message of type [[IsDone]] is received
  */
class MyActor extends Actor with ActorLogging {

  var totalProcessed = 0

  def receive = LoggingReceive {

    case response: HttpResponse =>

      if(response.status.isSuccess()) {
        totalProcessed = totalProcessed + 1
      } else if(response.status.isFailure()) {
        log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
      } else {
        log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
      }

    case IsDone =>
      println(s"total processed: $totalProcessed")
      sys.exit()
  }
}

case object IsDone

我不知道这是否是计算事物和处理响应状态的最佳方法,但它目前有效。

问题是如何以一种我可以知道是什么请求导致了特定错误的方式将原始请求传递给参与者。

我的演员可以期待以下内容:

case (request: String, response: HttpResponse) =>

但是如何传递我在管道开始时拥有的信息?

我想 map 是这样的:

source
  .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

但我不知道如何触发 Http 流。

有什么建议吗?

在@cmbaxter 的帮助下,我可以使用以下代码解决我的问题:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port)

source
  .map(url => HttpRequest(uri = url) -> url)
  .via(poolClientFlow)
  .to(Sink.actorRef(myActor, IsDone))
  .run()

现在我的演员可以收到这个了:

case (respTry: Try[HttpResponse], request: String) =>