如何从 akka-stream 传播到 akka-http 以记录错误并正确通知客户端?
How to get error from akka-stream propagate to akka-http to both be logged and notify the client properly?
现在我正在使用 akka-stream
和 akka-HTTP
构建文件流 API。因此,我将流式源注入到实体中,以便将数据直接流式传输到 HTTP 客户端,如下所示:
complete(HttpEntity(ContentTypes.`application/octet-stream`, source))
但是,如果由于某种原因流失败,连接将被 akka-http 关闭,无需进一步解释或记录。
我需要两样东西:
- 如何获取异常日志?
- 如何在关闭连接前通过消息通知我的客户?
谢谢
如评论中所述,HTTP 协议不允许向客户端发送错误信号。
关于日志记录:
对我来说,归结为在 akka http 中缺少正确的访问日志指令。
在我当前的项目中,我们有装饰器,它在将 http 实体提供给 akka http 进行渲染之前为 http 实体注册 onComplete 处理程序。
private def onResponseStreamEnd(response: HttpResponse)(action: StatusCode => Unit): HttpResponse =
if (!response.status.allowsEntity() || response.entity.isKnownEmpty()) {
action(response.status)
response
} else {
val dataBytes =
onStreamEnd(response.entity) { result =>
val overallStatusCode =
result match {
case Success(_) =>
response.status
case Failure(e) =>
logger.error(e, s"error streaming response [${e.getMessage}]")
StatusCodes.InternalServerError
}
action(overallStatusCode)
}
response.withEntity(response.entity.contentLengthOption match {
case Some(length) => HttpEntity(response.entity.contentType, length, dataBytes)
case None => HttpEntity(response.entity.contentType, dataBytes)
})
}
private def onStreamEnd(entity: HttpEntity)(onComplete: Try[Done] ⇒ Unit): Source[ByteString, _] =
entity.dataBytes.alsoTo { Sink.onComplete(onComplete) }
用法:
complete(onResponseStreamEnd(HttpResponse(StatusCodes.OK, HttpEntity(ContentTypes.`application/octet-stream`, source))){ statusCode => .... })
类似的方法,但使用自定义图表阶段,您可以找到 here
现在我正在使用 akka-stream
和 akka-HTTP
构建文件流 API。因此,我将流式源注入到实体中,以便将数据直接流式传输到 HTTP 客户端,如下所示:
complete(HttpEntity(ContentTypes.`application/octet-stream`, source))
但是,如果由于某种原因流失败,连接将被 akka-http 关闭,无需进一步解释或记录。
我需要两样东西:
- 如何获取异常日志?
- 如何在关闭连接前通过消息通知我的客户?
谢谢
如评论中所述,HTTP 协议不允许向客户端发送错误信号。
关于日志记录: 对我来说,归结为在 akka http 中缺少正确的访问日志指令。
在我当前的项目中,我们有装饰器,它在将 http 实体提供给 akka http 进行渲染之前为 http 实体注册 onComplete 处理程序。
private def onResponseStreamEnd(response: HttpResponse)(action: StatusCode => Unit): HttpResponse =
if (!response.status.allowsEntity() || response.entity.isKnownEmpty()) {
action(response.status)
response
} else {
val dataBytes =
onStreamEnd(response.entity) { result =>
val overallStatusCode =
result match {
case Success(_) =>
response.status
case Failure(e) =>
logger.error(e, s"error streaming response [${e.getMessage}]")
StatusCodes.InternalServerError
}
action(overallStatusCode)
}
response.withEntity(response.entity.contentLengthOption match {
case Some(length) => HttpEntity(response.entity.contentType, length, dataBytes)
case None => HttpEntity(response.entity.contentType, dataBytes)
})
}
private def onStreamEnd(entity: HttpEntity)(onComplete: Try[Done] ⇒ Unit): Source[ByteString, _] =
entity.dataBytes.alsoTo { Sink.onComplete(onComplete) }
用法:
complete(onResponseStreamEnd(HttpResponse(StatusCodes.OK, HttpEntity(ContentTypes.`application/octet-stream`, source))){ statusCode => .... })
类似的方法,但使用自定义图表阶段,您可以找到 here