Akka Streams / HTTP:从响应中获取原始请求

Akka Streams / HTTP: Get original request from response

我有一个通过流并发布 HTTP 请求的 Akka Streams 源:

source.map(toRequest)
  .via(Http().outgoingConnection(host))
  .map(toMessage) 

假设 toRequest 方法将字符串映射到 HttpRequest,而 toMessage 方法将 HttpResponse 映射到消息 class 即需要下游处理。假设消息class需要包含一些原始信息

是否可以从 HttpResponse 中获取原始 HttpRequest?如果没有,有什么办法可以保留一些原始信息吗?

一种方法是使用客户端 API 的 Future-based variant 和包含要向下游传播的信息的自定义案例 class。例如:

case class Message(request: HttpRequest, response: HttpResponse)

source
  .map(toRequest)
  .mapAsync(parallelism = 3) { request => // adjust the level of parallelism as needed
    Http().singleRequest(request).map(response => Message(request, response))
  }
  // continue processing; at this point you have a Source[Message, _]

您可以使用图表 api 绕过 HttpRequest。一个例子是:

object Main extends App {

  implicit val as = ActorSystem("Test")
  implicit val m = ActorMaterializer()
  implicit val ec = as.dispatcher

  def src1 = Source.fromIterator(() => List(1, 2, 3).toIterator)

  val srcGraph = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    // Create one flow to prepend the 'Number' string to the input integer
    def flow1 = Flow[Int].map { el => s"Number $el"  }

    // Create a broadcast stage 
    val broadcast = builder.add(Broadcast[Int](2))
    val zip       = builder.add(Zip[Int, String]())

    src1 ~> broadcast.in

    // The 0 port sends the int to the zip stage directly
    broadcast.out(0) ~>          zip.in0
    broadcast.out(1) ~> flow1 ~> zip.in1

    SourceShape(zip.out)
  })

  Source.fromGraph(srcGraph).runForeach(println(_))
}

图表 api 提供了很多选项来做这样的事情。