如何将 akka http 与 akka 流绑定?
How to bind akka http with akka streams?
我正在尝试使用流而不是纯 actor 来处理 http 请求,我使用了以下代码:
trait ImagesRoute {
val log = LoggerFactory.getLogger(this.getClass)
implicit def actorRefFactory: ActorRefFactory
implicit def materializer: ActorMaterializer
val source =
Source
.actorRef[Image](Int.MaxValue, OverflowStrategy.fail)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.toMat(Sink.asPublisher(true))(Keep.both)
val route = {
pathPrefix("images") {
pathEnd {
post {
entity(as[Image]) { image =>
val (ref, publisher) = source.run()
val addFuture = Source.fromPublisher(publisher)
val future = addFuture.runWith(Sink.head[Option[Image]])
ref ! image
onComplete(future.mapTo[Option[Image]]) {
case Success(img) =>
complete(Created, img)
case Failure(e) =>
log.error("Error adding image resource", e)
complete(InternalServerError, e.getMessage)
}
}
}
}
}
}
}
我不确定这是否是正确的方法,或者即使这是一个好方法,或者我是否应该使用 actor 与路由交互,使用询问模式然后在 actor 内部, 传输所有内容。
有什么想法吗?
如果您只希望从实体获得 1 张图像,那么您不需要从 ActorRef 创建 Source
,也不需要 Sink.asPublisher
,您可以简单地使用 Source.single
:
def imageToComplete(img : Option[Image]) : StandardRoute =
img.map(i => complete(Created, i))
.getOrElse {
log error ("Error adding image resource", e)
complete(InternalServerError, e.getMessage
}
...
entity(as[Image]) { image =>
val future : Future[StandardRoute] =
Source.single(image)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.runWith(Sink.head[Option[Image]])
.map(imageToComplete)
onComplete(future)
}
进一步简化您的代码,您只处理 1 张图像这一事实意味着流是不必要的,因为只需要 1 个元素就不需要背压:
val future : Future[StandardRoute] = ImageRepository.add(image)
.map(imageToComplete)
onComplete(future)
在您指出的评论中
"this is just a simple example, but the stream pipeline should be
bigger doing a lot of things like contacting external resources and
eventually back pressure things"
这仅适用于您的实体是图像流的情况。如果您只为每个 HttpRequest 处理 1 个图像,那么背压永远不会应用,并且您创建的任何流都将是 。
如果您的实体实际上是图像流,那么您可以将其用作流的一部分:
val byteStrToImage : Flow[ByteString, Image, _] = ???
val imageToByteStr : Flow[Image, Source[ByteString], _] = ???
def imageOptToSource(img : Option[Image]) : Source[Image,_] =
Source fromIterator img.toIterator
val route = path("images") {
post {
extractRequestEntity { reqEntity =>
val stream = reqEntity.via(byteStrToImage)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.via(Flow.flatMapConcat(imageOptToSource))
.via(Flow.flatMapConcat(imageToByteStr))
complete(HttpResponse(status=Created,entity = stream))
}
}
}
我正在尝试使用流而不是纯 actor 来处理 http 请求,我使用了以下代码:
trait ImagesRoute {
val log = LoggerFactory.getLogger(this.getClass)
implicit def actorRefFactory: ActorRefFactory
implicit def materializer: ActorMaterializer
val source =
Source
.actorRef[Image](Int.MaxValue, OverflowStrategy.fail)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.toMat(Sink.asPublisher(true))(Keep.both)
val route = {
pathPrefix("images") {
pathEnd {
post {
entity(as[Image]) { image =>
val (ref, publisher) = source.run()
val addFuture = Source.fromPublisher(publisher)
val future = addFuture.runWith(Sink.head[Option[Image]])
ref ! image
onComplete(future.mapTo[Option[Image]]) {
case Success(img) =>
complete(Created, img)
case Failure(e) =>
log.error("Error adding image resource", e)
complete(InternalServerError, e.getMessage)
}
}
}
}
}
}
}
我不确定这是否是正确的方法,或者即使这是一个好方法,或者我是否应该使用 actor 与路由交互,使用询问模式然后在 actor 内部, 传输所有内容。
有什么想法吗?
如果您只希望从实体获得 1 张图像,那么您不需要从 ActorRef 创建 Source
,也不需要 Sink.asPublisher
,您可以简单地使用 Source.single
:
def imageToComplete(img : Option[Image]) : StandardRoute =
img.map(i => complete(Created, i))
.getOrElse {
log error ("Error adding image resource", e)
complete(InternalServerError, e.getMessage
}
...
entity(as[Image]) { image =>
val future : Future[StandardRoute] =
Source.single(image)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.runWith(Sink.head[Option[Image]])
.map(imageToComplete)
onComplete(future)
}
进一步简化您的代码,您只处理 1 张图像这一事实意味着流是不必要的,因为只需要 1 个元素就不需要背压:
val future : Future[StandardRoute] = ImageRepository.add(image)
.map(imageToComplete)
onComplete(future)
在您指出的评论中
"this is just a simple example, but the stream pipeline should be bigger doing a lot of things like contacting external resources and eventually back pressure things"
这仅适用于您的实体是图像流的情况。如果您只为每个 HttpRequest 处理 1 个图像,那么背压永远不会应用,并且您创建的任何流都将是
如果您的实体实际上是图像流,那么您可以将其用作流的一部分:
val byteStrToImage : Flow[ByteString, Image, _] = ???
val imageToByteStr : Flow[Image, Source[ByteString], _] = ???
def imageOptToSource(img : Option[Image]) : Source[Image,_] =
Source fromIterator img.toIterator
val route = path("images") {
post {
extractRequestEntity { reqEntity =>
val stream = reqEntity.via(byteStrToImage)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.via(Flow.flatMapConcat(imageOptToSource))
.via(Flow.flatMapConcat(imageToByteStr))
complete(HttpResponse(status=Created,entity = stream))
}
}
}