具有 HTTP 接口的 Akka actor 系统
Akka actor system with HTTP interface
我正在尝试创建一个 Akka 系统,它可以响应 HTTP 请求。我创建了一些可以很好地交换消息的演员。我还可以使用 akka-http 来响应 HTTP 请求。问题在于连接这两个部分。
TL;DR: 如何在 akka-http 请求处理期间与 Akka actors 对话?
我创建了一个负责启动 HTTP 系统的参与者:
class HttpActor extends Actor with ActorLogging {
/* implicits elided */
private def initHttp() = {
val route: Route = path("status") { get { complete { "OK" } } }
Http()(context.system).bindAndHandle(route, "localhost", 8080)
}
private var bound: Option[Future[Http.ServerBinding]] = None
override def receive = {
case HttpActor.Init =>
bound match {
case Some(x) => log.warning("Http already bootstrapping")
case None =>
bound = Some(initHttp(watcher))
}
}
}
object HttpActor {
case object Init
}
如您所见,actor 在收到的第一条消息上创建 akka-http 服务(没有理由,真的,它也可以在构造函数中完成)。
现在,在处理请求的过程中,我需要与其他一些参与者进行通信,但无法正常工作。
我的做法:
private def initInteractiveHttp() = {
val route: Route = path("status") {
get { complete { "OK" } }
} ~ path("ask") {
get { complete {
// Here are the interesting two lines:
val otherActorResponse = someOtherActor ? SomeMessage
otherActorResponse.mapTo[String]
} }
Http()(context.system).bindAndHandle(route, "localhost", 8080)
}
这将发送 SomeMessage
到 someOtherActor
并在完成请求-响应周期之前等待响应。然而,据我所知,消息将从根 HttpActor
发送,这很糟糕,并且在可伸缩性方面无济于事。理想情况下,我会为每个请求创建一个不同的专门参与者实例,但由于 akka-http 键入而失败。考虑以下示例:
class DisposableActor(httpContext: HttpContext) {
override def preStart() = {
// ... send some questions to other actors
}
override def receive = {
case GotAllData(x) => httpContext.complete(x)
}
}
class HttpActorWithDisposables {
// there is a `context` value in scope - we're an actor, after all
private def initHttpWithDisposableActors() = {
val route: Route = path("status") {
get { complete { "OK" } }
} ~ path("ask") {
get { httpContext =>
val props = Props(new DisposableActor(httpContext))
val disposableActor = context.actorOf(props, "disposable-actor-name")
// I have nothing to return here
}
}
Http()(context.system).bindAndHandle(route, "localhost", 8080)
}
通过这种方法,我可以强制 DisposableActor 在某个时间点调用 httpContext.complete
。这应该正确地结束请求-响应处理周期。但是,Route DSL 需要 return 在 get
块内的有效响应(或 Future),因此这种方法不起作用。
其实你的第一种方法还不错。
ask 模式为您创建了一个轻量级、一次性的 actor,它等待结果(非阻塞地)以完成未来。它基本上完成了您想用 DisposableActor
复制的所有事情,并且您的主要 HttpActor 不受此压力。
如果您还想使用不同的演员,有 completeWith
:
completeWith(instanceOf[String]) { complete =>
// complete is of type String => Unit
val props = Props(new DisposableActor(complete))
val disposableActor = context.actorOf(props, "disposable-actor-name")
// completeWith expects you to return unit
}
在你的 actor 中,当你有结果时调用完整的函数
class DisposableActor(complete: String => Unit) {
override def receive = {
case GotAllData(x) =>
complete(x)
context stop self // don't for get to stop the actor
}
}
我正在尝试创建一个 Akka 系统,它可以响应 HTTP 请求。我创建了一些可以很好地交换消息的演员。我还可以使用 akka-http 来响应 HTTP 请求。问题在于连接这两个部分。
TL;DR: 如何在 akka-http 请求处理期间与 Akka actors 对话?
我创建了一个负责启动 HTTP 系统的参与者:
class HttpActor extends Actor with ActorLogging {
/* implicits elided */
private def initHttp() = {
val route: Route = path("status") { get { complete { "OK" } } }
Http()(context.system).bindAndHandle(route, "localhost", 8080)
}
private var bound: Option[Future[Http.ServerBinding]] = None
override def receive = {
case HttpActor.Init =>
bound match {
case Some(x) => log.warning("Http already bootstrapping")
case None =>
bound = Some(initHttp(watcher))
}
}
}
object HttpActor {
case object Init
}
如您所见,actor 在收到的第一条消息上创建 akka-http 服务(没有理由,真的,它也可以在构造函数中完成)。
现在,在处理请求的过程中,我需要与其他一些参与者进行通信,但无法正常工作。
我的做法:
private def initInteractiveHttp() = {
val route: Route = path("status") {
get { complete { "OK" } }
} ~ path("ask") {
get { complete {
// Here are the interesting two lines:
val otherActorResponse = someOtherActor ? SomeMessage
otherActorResponse.mapTo[String]
} }
Http()(context.system).bindAndHandle(route, "localhost", 8080)
}
这将发送 SomeMessage
到 someOtherActor
并在完成请求-响应周期之前等待响应。然而,据我所知,消息将从根 HttpActor
发送,这很糟糕,并且在可伸缩性方面无济于事。理想情况下,我会为每个请求创建一个不同的专门参与者实例,但由于 akka-http 键入而失败。考虑以下示例:
class DisposableActor(httpContext: HttpContext) {
override def preStart() = {
// ... send some questions to other actors
}
override def receive = {
case GotAllData(x) => httpContext.complete(x)
}
}
class HttpActorWithDisposables {
// there is a `context` value in scope - we're an actor, after all
private def initHttpWithDisposableActors() = {
val route: Route = path("status") {
get { complete { "OK" } }
} ~ path("ask") {
get { httpContext =>
val props = Props(new DisposableActor(httpContext))
val disposableActor = context.actorOf(props, "disposable-actor-name")
// I have nothing to return here
}
}
Http()(context.system).bindAndHandle(route, "localhost", 8080)
}
通过这种方法,我可以强制 DisposableActor 在某个时间点调用 httpContext.complete
。这应该正确地结束请求-响应处理周期。但是,Route DSL 需要 return 在 get
块内的有效响应(或 Future),因此这种方法不起作用。
其实你的第一种方法还不错。
ask 模式为您创建了一个轻量级、一次性的 actor,它等待结果(非阻塞地)以完成未来。它基本上完成了您想用 DisposableActor
复制的所有事情,并且您的主要 HttpActor 不受此压力。
如果您还想使用不同的演员,有 completeWith
:
completeWith(instanceOf[String]) { complete =>
// complete is of type String => Unit
val props = Props(new DisposableActor(complete))
val disposableActor = context.actorOf(props, "disposable-actor-name")
// completeWith expects you to return unit
}
在你的 actor 中,当你有结果时调用完整的函数
class DisposableActor(complete: String => Unit) {
override def receive = {
case GotAllData(x) =>
complete(x)
context stop self // don't for get to stop the actor
}
}