什么时候在 akka-http 中实现 http 响应的内容?
When to materialize content of http response in akka-http?
让我们想象一下基于 akka-streams 和 akka-http 的代理应用程序,它接收(作为 TCP 服务器)一些本地格式的消息,从它们发出 http 请求,询问其他一些 http 服务器,将 http 响应转换回本地格式并回复给客户。简化代码如下:
// as Client part
val connPool = Http().cachedHostConnectionPool[CustHttpReq](someHost, somePort)
val asClientFlow = Flow[CustHttpReq]
.via (connPool)
.map (procHttpResp)
def procHttpResp (p: (Try[HttpResponse], CustHttpReq)): Future[ByteString] = {
val (rsp, src) = p
rsp match {
case Success(response: HttpResponse) =>
for (buf <- cvtToHomeGrown (response, src))
yield buf
case Failure(ex) => ...
}
}
def cvtToHomeGrown (rsp: HttpResponse): Future[ByteString] = {
rsp.entity.dataBytes.runWith (Sink.fold (ByteString.empty)(_ ++ _))
.map (cvtToHomeGrownActually) // has signature String => ByteString
}
// as Server part
val parseAndAskFlow = Flow[ByteString]
.via(Framing.delimiter(
ByteString('\n'))
.map (buf => cvtToCustHttpReq (buf))
.via (asClientFlow) // plug-in asClient part, the problem is here
val asServerConn: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind("localhost",port)
asServerConn.runForeach (conn => conn.handleWith(parseAndAskFlow)
问题是 conn.handleWith 需要 Flow[ByteString,ByteString,],但是 http 客户端代码 (rsp.entity.dataBytes...) returns Future[ByteSring],所以 parseAndAskFlow 有Flow[ByteString,Future[ByteString],] 类型,我不知道在哪里可以更好地完成它。我什至猜想这根本不是一个好主意,因为所有这些都是流,并且 Await somethere 会停止很好的异步处理,但代码不会被编译。
使用 mapAsync
而不是 map
将 asClientFlow
的类型更改为 Flow[CustHttpReq, ByteString]
:
val asClientFlow: Flow[CustHttpReq, ByteString] =
Flow[CustHttpReq]
.via(connPool)
.mapAsync(1)(procHttpResp)
那么parseAndAskFlow
可以是类型Flow[ByteString, ByteString]
:
val parseAndAskFlow: Flow[ByteString, ByteString] =
Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"))
.map(cvtToCustHttpReq)
.via(asClientFlow)
让我们想象一下基于 akka-streams 和 akka-http 的代理应用程序,它接收(作为 TCP 服务器)一些本地格式的消息,从它们发出 http 请求,询问其他一些 http 服务器,将 http 响应转换回本地格式并回复给客户。简化代码如下:
// as Client part
val connPool = Http().cachedHostConnectionPool[CustHttpReq](someHost, somePort)
val asClientFlow = Flow[CustHttpReq]
.via (connPool)
.map (procHttpResp)
def procHttpResp (p: (Try[HttpResponse], CustHttpReq)): Future[ByteString] = {
val (rsp, src) = p
rsp match {
case Success(response: HttpResponse) =>
for (buf <- cvtToHomeGrown (response, src))
yield buf
case Failure(ex) => ...
}
}
def cvtToHomeGrown (rsp: HttpResponse): Future[ByteString] = {
rsp.entity.dataBytes.runWith (Sink.fold (ByteString.empty)(_ ++ _))
.map (cvtToHomeGrownActually) // has signature String => ByteString
}
// as Server part
val parseAndAskFlow = Flow[ByteString]
.via(Framing.delimiter(
ByteString('\n'))
.map (buf => cvtToCustHttpReq (buf))
.via (asClientFlow) // plug-in asClient part, the problem is here
val asServerConn: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind("localhost",port)
asServerConn.runForeach (conn => conn.handleWith(parseAndAskFlow)
问题是 conn.handleWith 需要 Flow[ByteString,ByteString,],但是 http 客户端代码 (rsp.entity.dataBytes...) returns Future[ByteSring],所以 parseAndAskFlow 有Flow[ByteString,Future[ByteString],] 类型,我不知道在哪里可以更好地完成它。我什至猜想这根本不是一个好主意,因为所有这些都是流,并且 Await somethere 会停止很好的异步处理,但代码不会被编译。
使用 mapAsync
而不是 map
将 asClientFlow
的类型更改为 Flow[CustHttpReq, ByteString]
:
val asClientFlow: Flow[CustHttpReq, ByteString] =
Flow[CustHttpReq]
.via(connPool)
.mapAsync(1)(procHttpResp)
那么parseAndAskFlow
可以是类型Flow[ByteString, ByteString]
:
val parseAndAskFlow: Flow[ByteString, ByteString] =
Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"))
.map(cvtToCustHttpReq)
.via(asClientFlow)