如何从 akka-stream 传播到 akka-http 以记录错误并正确通知客户端?

How to get error from akka-stream propagate to akka-http to both be logged and notify the client properly?

现在我正在使用 akka-streamakka-HTTP 构建文件流 API。因此,我将流式源注入到实体中,以便将数据直接流式传输到 HTTP 客户端,如下所示:

complete(HttpEntity(ContentTypes.`application/octet-stream`, source))

但是,如果由于某种原因流失败,连接将被 akka-http 关闭,无需进一步解释或记录。

我需要两样东西:

谢谢

如评论中所述,HTTP 协议不允许向客户端发送错误信号。

关于日志记录: 对我来说,归结为在 akka http 中缺少正确的访问日志指令。

在我当前的项目中,我们有装饰器,它在将 http 实体提供给 akka http 进行渲染之前为 http 实体注册 onComplete 处理程序。

  private def onResponseStreamEnd(response: HttpResponse)(action: StatusCode => Unit): HttpResponse =
    if (!response.status.allowsEntity() || response.entity.isKnownEmpty()) {
      action(response.status)
      response
    } else {
      val dataBytes =
        onStreamEnd(response.entity) { result =>
          val overallStatusCode =
            result match {
              case Success(_) =>
                response.status

              case Failure(e) =>
                logger.error(e, s"error streaming response [${e.getMessage}]")
                StatusCodes.InternalServerError
            }

          action(overallStatusCode)
        }

      response.withEntity(response.entity.contentLengthOption match {
        case Some(length) => HttpEntity(response.entity.contentType, length, dataBytes)
        case None         => HttpEntity(response.entity.contentType, dataBytes)
      })
    }

  private def onStreamEnd(entity: HttpEntity)(onComplete: Try[Done] ⇒ Unit): Source[ByteString, _] =
    entity.dataBytes.alsoTo { Sink.onComplete(onComplete) }

用法:

complete(onResponseStreamEnd(HttpResponse(StatusCodes.OK, HttpEntity(ContentTypes.`application/octet-stream`, source))){ statusCode => .... })

类似的方法,但使用自定义图表阶段,您可以找到 here