Akka http 流式传输响应 headers
Akka http streaming the response headers
根据定义,http 响应分为 3 个 parts、status-code -> headers -> body
,当执行 akka 客户端 http 请求时,在前两部分完成后接收到 http 响应
收到。
val responseFuture: Future[HttpResponse]
responseFuture.map {
case HttpResponse(statusCode:StatusCode, headers:Seq[HttpHeader], entity:ResponseEntity, protocol:HttpProtocol)
}
这对于大多数用例来说完全没问题,但在我的特定情况下,我需要在收到所有 headers 之前访问 headers(第三方服务器通过编写自定义返回进度progress headers 直到响应就绪)。有什么方法可以像访问 body 一样访问 headers 吗?
val entity: ResponseEntity
val entitySource:Source[ByteString, Any] = entity.dataBytes
在完美的世界中,也有一种方法可以访问 headers 作为来源
HttpResponse(statusCode:StatusCode, headers:Source[HttpHeader, NotUsed], entity:ResponseEntity, protocol:HttpProtocol)
不可能 akka-http
representation of HttpResponse
将 header 视为 Seq[HttpHeader]
而不是 Iterator
或 akka-stream
Source
。因此,正如问题中所解释的,如果没有所有 header 值首先可用,就不可能实例化 HttpResponse object。
我不知道这个设计决定背后的确切原因,但我怀疑这是因为很难支持 header 的来源和 body 的来源。如果不首先使用 header Source,将无法使用 body Source,因此必须严格按顺序访问响应的成员变量。这会导致混乱和意外错误。
低级处理 akka-stream
hypertext transfer protocol is just an application layer protocol, usually on top of TCP. And, it is a fairly simple message format:
The response message consists of the following:
- A status line which includes the status code and reason message (e.g.,
HTTP/1.1 200 OK, which indicates that the client's request succeeded).
- Response header fields (e.g., Content-Type: text/html).
- An empty line.
- An optional message body.
因此,您可以使用 Tcp
绑定来获取连接并解析消息 ByteString
自己获取来源以获取 headers:
val maximumFrameLength = 1024 * 1024
val endOfMessageLine : () => Byte => Boolean = () => {
var previousWasCarriage = false
(byte) =>
if(byte == '\r') {
previousWasCarriage = true
false
}
else if(byte == '\n' && previousWasCarriage) {
previousWasCarriage = false
true
}
else {
previousWasCarriage = false
false
}
}
def subFlow =
Flow[ByteString].flatMapConcat(str => Source.fromIterable(str))
.splitAfter(endOfMessageLine())
不幸的是,这可能还需要通过 Tcp 绑定将您的请求作为原始 ByteString 发送。
根据定义,http 响应分为 3 个 parts、status-code -> headers -> body
,当执行 akka 客户端 http 请求时,在前两部分完成后接收到 http 响应
收到。
val responseFuture: Future[HttpResponse]
responseFuture.map {
case HttpResponse(statusCode:StatusCode, headers:Seq[HttpHeader], entity:ResponseEntity, protocol:HttpProtocol)
}
这对于大多数用例来说完全没问题,但在我的特定情况下,我需要在收到所有 headers 之前访问 headers(第三方服务器通过编写自定义返回进度progress headers 直到响应就绪)。有什么方法可以像访问 body 一样访问 headers 吗?
val entity: ResponseEntity
val entitySource:Source[ByteString, Any] = entity.dataBytes
在完美的世界中,也有一种方法可以访问 headers 作为来源
HttpResponse(statusCode:StatusCode, headers:Source[HttpHeader, NotUsed], entity:ResponseEntity, protocol:HttpProtocol)
不可能 akka-http
representation of HttpResponse
将 header 视为 Seq[HttpHeader]
而不是 Iterator
或 akka-stream
Source
。因此,正如问题中所解释的,如果没有所有 header 值首先可用,就不可能实例化 HttpResponse object。
我不知道这个设计决定背后的确切原因,但我怀疑这是因为很难支持 header 的来源和 body 的来源。如果不首先使用 header Source,将无法使用 body Source,因此必须严格按顺序访问响应的成员变量。这会导致混乱和意外错误。
低级处理 akka-stream
hypertext transfer protocol is just an application layer protocol, usually on top of TCP. And, it is a fairly simple message format:
The response message consists of the following:
- A status line which includes the status code and reason message (e.g., HTTP/1.1 200 OK, which indicates that the client's request succeeded).
- Response header fields (e.g., Content-Type: text/html).
- An empty line.
- An optional message body.
因此,您可以使用 Tcp
绑定来获取连接并解析消息 ByteString
自己获取来源以获取 headers:
val maximumFrameLength = 1024 * 1024
val endOfMessageLine : () => Byte => Boolean = () => {
var previousWasCarriage = false
(byte) =>
if(byte == '\r') {
previousWasCarriage = true
false
}
else if(byte == '\n' && previousWasCarriage) {
previousWasCarriage = false
true
}
else {
previousWasCarriage = false
false
}
}
def subFlow =
Flow[ByteString].flatMapConcat(str => Source.fromIterable(str))
.splitAfter(endOfMessageLine())
不幸的是,这可能还需要通过 Tcp 绑定将您的请求作为原始 ByteString 发送。