http => akka 流 => http
http => akka stream => http
我想使用 akka 流来将一些 json 网络服务连接在一起。我想知道从 http 请求生成流并将块流到另一个请求的最佳方法。
有没有一种方法可以定义这样的图形并 运行 它而不是下面的代码?
到目前为止,我尝试这样做,不确定它是否真的在流式传输:
override def receive: Receive = {
case GetTestData(p, id) =>
// Get the data and pipes it to itself through a message as recommended
// https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
http.singleRequest(HttpRequest(uri = uri.format(p, id)))
.pipeTo(self)
case HttpResponse(StatusCodes.OK, _, entity, _) =>
val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))
// Forward the response to next job and pipes the request response to dedicated actor
http.singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`,
initialRes)
))
case resp @ HttpResponse(code, _, _, _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
case _ => log.warning("Unexpected message in TestJobActor")
}
这应该是一个相当于你的 receive
:
的图表
Http()
.cachedHostConnectionPool[Unit](uri.format(p, id))
.collect {
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
val initialRes = entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(bStr => ChunkStreamPart(bStr.utf8String))
Some(initialRes)
case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
None
}
.collect {
case Some(initialRes) => initialRes
}
.map { initialRes =>
(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
),
())
}
.via(Http().superPool[Unit]())
这个的类型是Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), HostConnectionPool]
,其中Unit
是一个相关ID,如果你想知道哪个请求对应于到达的响应,你可以使用它,并且HostConnectionPool
具体化值可用于关闭与主机的连接。只有 cachedHostConnectionPool
返回这个物化值,superPool
可能会自行处理(虽然我没有检查过)。无论如何,我建议您在关闭应用程序时只使用 Http().shutdownAllConnectionPools()
,除非您出于某种原因需要其他方式。根据我的经验,它更不容易出错(例如忘记关机)。
你也可以使用Graph DSL,来表达同一个图:
val graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val host1Flow = b.add(Http().cachedHostConnectionPool[Unit](uri.format(p, id)))
val host2Flow = b.add(Http().superPool[Unit]())
val toInitialRes = b.add(
Flow[(Try[HttpResponse], Unit)]
.collect {
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
val initialRes = entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(bStr => ChunkStreamPart(bStr.utf8String))
Some(initialRes)
case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
None
}
)
val keepOkStatus = b.add(
Flow[Option[Source[HttpEntity.ChunkStreamPart, Any]]]
.collect {
case Some(initialRes) => initialRes
}
)
val toOtherHost = b.add(
Flow[Source[HttpEntity.ChunkStreamPart, Any]]
.map { initialRes =>
(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
),
())
}
)
host1Flow ~> toInitialRes ~> keepOkStatus ~> toOtherHost ~> host2Flow
FlowShape(host1Flow.in, host2Flow.out)
})
我想使用 akka 流来将一些 json 网络服务连接在一起。我想知道从 http 请求生成流并将块流到另一个请求的最佳方法。 有没有一种方法可以定义这样的图形并 运行 它而不是下面的代码? 到目前为止,我尝试这样做,不确定它是否真的在流式传输:
override def receive: Receive = {
case GetTestData(p, id) =>
// Get the data and pipes it to itself through a message as recommended
// https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
http.singleRequest(HttpRequest(uri = uri.format(p, id)))
.pipeTo(self)
case HttpResponse(StatusCodes.OK, _, entity, _) =>
val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))
// Forward the response to next job and pipes the request response to dedicated actor
http.singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`,
initialRes)
))
case resp @ HttpResponse(code, _, _, _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
case _ => log.warning("Unexpected message in TestJobActor")
}
这应该是一个相当于你的 receive
:
Http()
.cachedHostConnectionPool[Unit](uri.format(p, id))
.collect {
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
val initialRes = entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(bStr => ChunkStreamPart(bStr.utf8String))
Some(initialRes)
case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
None
}
.collect {
case Some(initialRes) => initialRes
}
.map { initialRes =>
(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
),
())
}
.via(Http().superPool[Unit]())
这个的类型是Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), HostConnectionPool]
,其中Unit
是一个相关ID,如果你想知道哪个请求对应于到达的响应,你可以使用它,并且HostConnectionPool
具体化值可用于关闭与主机的连接。只有 cachedHostConnectionPool
返回这个物化值,superPool
可能会自行处理(虽然我没有检查过)。无论如何,我建议您在关闭应用程序时只使用 Http().shutdownAllConnectionPools()
,除非您出于某种原因需要其他方式。根据我的经验,它更不容易出错(例如忘记关机)。
你也可以使用Graph DSL,来表达同一个图:
val graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val host1Flow = b.add(Http().cachedHostConnectionPool[Unit](uri.format(p, id)))
val host2Flow = b.add(Http().superPool[Unit]())
val toInitialRes = b.add(
Flow[(Try[HttpResponse], Unit)]
.collect {
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
val initialRes = entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(bStr => ChunkStreamPart(bStr.utf8String))
Some(initialRes)
case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
None
}
)
val keepOkStatus = b.add(
Flow[Option[Source[HttpEntity.ChunkStreamPart, Any]]]
.collect {
case Some(initialRes) => initialRes
}
)
val toOtherHost = b.add(
Flow[Source[HttpEntity.ChunkStreamPart, Any]]
.map { initialRes =>
(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
),
())
}
)
host1Flow ~> toInitialRes ~> keepOkStatus ~> toOtherHost ~> host2Flow
FlowShape(host1Flow.in, host2Flow.out)
})