Akka http 流式传输响应 headers

Akka http streaming the response headers

根据定义,http 响应分为 3 个 partsstatus-code -> headers -> body,当执行 akka 客户端 http 请求时,在前两部分完成后接收到 http 响应 收到。

  val responseFuture: Future[HttpResponse]
  responseFuture.map {
    case HttpResponse(statusCode:StatusCode, headers:Seq[HttpHeader], entity:ResponseEntity, protocol:HttpProtocol)
  }

这对于大多数用例来说完全没问题,但在我的特定情况下,我需要在收到所有 headers 之前访问 headers(第三方服务器通过编写自定义返回进度progress headers 直到响应就绪)。有什么方法可以像访问 body 一样访问 headers 吗?

  val entity: ResponseEntity
  val entitySource:Source[ByteString, Any] = entity.dataBytes

在完美的世界中,也有一种方法可以访问 headers 作为来源

HttpResponse(statusCode:StatusCode, headers:Source[HttpHeader, NotUsed], entity:ResponseEntity, protocol:HttpProtocol)

不可能 akka-http

representation of HttpResponse 将 header 视为 Seq[HttpHeader] 而不是 Iteratorakka-stream Source。因此,正如问题中所解释的,如果没有所有 header 值首先可用,就不可能实例化 HttpResponse object。

我不知道这个设计决定背后的确切原因,但我怀疑这是因为很难支持 header 的来源和 body 的来源。如果不首先使用 header Source,将无法使用 body Source,因此必须严格按顺序访问响应的成员变量。这会导致混乱和意外错误。

低级处理 akka-stream

hypertext transfer protocol is just an application layer protocol, usually on top of TCP. And, it is a fairly simple message format:

The response message consists of the following:

  • A status line which includes the status code and reason message (e.g., HTTP/1.1 200 OK, which indicates that the client's request succeeded).
  • Response header fields (e.g., Content-Type: text/html).
  • An empty line.
  • An optional message body.

因此,您可以使用 Tcp 绑定来获取连接并解析消息 ByteString 自己获取来源以获取 headers:

val maximumFrameLength = 1024 * 1024

val endOfMessageLine : () => Byte => Boolean = () => {
  var previousWasCarriage = false

  (byte) => 
    if(byte == '\r') {
      previousWasCarriage = true
      false
    }
    else if(byte == '\n' && previousWasCarriage) {
      previousWasCarriage = false
      true
    }
    else {
      previousWasCarriage = false
      false
    }
}

def subFlow = 
  Flow[ByteString].flatMapConcat(str => Source.fromIterable(str))
                  .splitAfter(endOfMessageLine())

不幸的是,这可能还需要通过 Tcp 绑定将您的请求作为原始 ByteString 发送。