http => akka 流 => http

http => akka stream => http

我想使用 akka 流来将一些 json 网络服务连接在一起。我想知道从 http 请求生成流并将块流到另一个请求的最佳方法。 有没有一种方法可以定义这样的图形并 运行 它而不是下面的代码? 到目前为止,我尝试这样做,不确定它是否真的在流式传输:

override def receive: Receive = {
   case GetTestData(p, id) =>
     // Get the data and pipes it to itself through a message as recommended
     // https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
     http.singleRequest(HttpRequest(uri = uri.format(p, id)))
       .pipeTo(self)

   case HttpResponse(StatusCodes.OK, _, entity, _) =>
     val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))

     // Forward the response to next job and pipes the request response to dedicated actor
     http.singleRequest(HttpRequest(
       method = HttpMethods.POST,
       uri = "googl.cm/flow",
       entity = HttpEntity.Chunked(ContentTypes.`application/json`, 
       initialRes)
     ))


   case resp @ HttpResponse(code, _, _, _) =>
     log.error("Request to test job failed, response code: " + code)
     // Discard the flow to avoid backpressure
     resp.discardEntityBytes()

   case _ => log.warning("Unexpected message in TestJobActor")
 }

这应该是一个相当于你的 receive:

的图表
Http()
.cachedHostConnectionPool[Unit](uri.format(p, id))
.collect {
  case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
    val initialRes = entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(bStr => ChunkStreamPart(bStr.utf8String))
    Some(initialRes)

  case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
    log.error("Request to test job failed, response code: " + code)
    // Discard the flow to avoid backpressure
    resp.discardEntityBytes()
    None
}
.collect {
  case Some(initialRes) => initialRes
}
.map { initialRes =>
  (HttpRequest(
     method = HttpMethods.POST,
     uri = "googl.cm/flow",
     entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
   ),
   ())
}
.via(Http().superPool[Unit]())

这个的类型是Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), HostConnectionPool],其中Unit是一个相关ID,如果你想知道哪个请求对应于到达的响应,你可以使用它,并且HostConnectionPool具体化值可用于关闭与主机的连接。只有 cachedHostConnectionPool 返回这个物化值,superPool 可能会自行处理(虽然我没有检查过)。无论如何,我建议您在关闭应用程序时只使用 Http().shutdownAllConnectionPools() ,除非您出于某种原因需要其他方式。根据我的经验,它更不容易出错(例如忘记关机)。

你也可以使用Graph DSL,来表达同一个图:

val graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

    val host1Flow = b.add(Http().cachedHostConnectionPool[Unit](uri.format(p, id)))
    val host2Flow = b.add(Http().superPool[Unit]())

    val toInitialRes = b.add(
      Flow[(Try[HttpResponse], Unit)]
        .collect {
          case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
            val initialRes = entity.dataBytes
              .via(JsonFraming.objectScanner(Int.MaxValue))
              .map(bStr => ChunkStreamPart(bStr.utf8String))
            Some(initialRes)

          case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
            log.error("Request to test job failed, response code: " + code)
            // Discard the flow to avoid backpressure
            resp.discardEntityBytes()
            None
        }
    )

    val keepOkStatus = b.add(
      Flow[Option[Source[HttpEntity.ChunkStreamPart, Any]]]
        .collect {
          case Some(initialRes) => initialRes
        }
    )

    val toOtherHost = b.add(
      Flow[Source[HttpEntity.ChunkStreamPart, Any]]
        .map { initialRes =>
          (HttpRequest(
             method = HttpMethods.POST,
             uri = "googl.cm/flow",
             entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
           ),
           ())
        }
    )

    host1Flow ~> toInitialRes ~> keepOkStatus ~> toOtherHost ~> host2Flow

    FlowShape(host1Flow.in, host2Flow.out)
})