具有 HTTP 接口的 Akka actor 系统

Akka actor system with HTTP interface

我正在尝试创建一个 Akka 系统,它可以响应 HTTP 请求。我创建了一些可以很好地交换消息的演员。我还可以使用 akka-http 来响应 HTTP 请求。问题在于连接这两个部分。

TL;DR: 如何在 akka-http 请求处理期间与 Akka actors 对话?

我创建了一个负责启动 HTTP 系统的参与者:

class HttpActor extends Actor with ActorLogging  {
  /* implicits elided */

  private def initHttp() = {
    val route: Route =  path("status") { get { complete { "OK" } } }
    Http()(context.system).bindAndHandle(route, "localhost", 8080)
  }
  private var bound: Option[Future[Http.ServerBinding]] = None

  override def receive = {
    case HttpActor.Init =>
      bound match {
        case Some(x) => log.warning("Http already bootstrapping")
        case None =>
          bound = Some(initHttp(watcher))
      }

  }
}

object HttpActor {
  case object Init
}

如您所见,actor 在收到的第一条消息上创建 akka-http 服务(没有理由,真的,它也可以在构造函数中完成)。

现在,在处理请求的过程中,我需要与其他一些参与者进行通信,但无法正常工作。

我的做法:

  private def initInteractiveHttp() = {
    val route: Route =  path("status") { 
      get { complete { "OK" } } 
    } ~ path("ask") {
      get { complete {
        // Here are the interesting two lines:
        val otherActorResponse = someOtherActor ? SomeMessage
        otherActorResponse.mapTo[String]
    } }
    Http()(context.system).bindAndHandle(route, "localhost", 8080)
  }

这将发送 SomeMessagesomeOtherActor 并在完成请求-响应周期之前等待响应。然而,据我所知,消息将从根 HttpActor 发送,这很糟糕,并且在可伸缩性方面无济于事。理想情况下,我会为每个请求创建一个不同的专门参与者实例,但由于 akka-http 键入而失败。考虑以下示例:

class DisposableActor(httpContext: HttpContext) {
    override def preStart() = {
       // ... send some questions to other actors
    }
    override def receive = {
      case GotAllData(x) => httpContext.complete(x)
    }
}

class HttpActorWithDisposables {
  // there is a `context` value in scope - we're an actor, after all
  private def initHttpWithDisposableActors() = {
    val route: Route =  path("status") { 
      get { complete { "OK" } } 
    } ~ path("ask") {
      get { httpContext =>
        val props = Props(new DisposableActor(httpContext))
        val disposableActor = context.actorOf(props, "disposable-actor-name")
        // I have nothing to return here
      }
    }
    Http()(context.system).bindAndHandle(route, "localhost", 8080)
  }

通过这种方法,我可以强制 DisposableActor 在某个时间点调用 httpContext.complete。这应该正确地结束请求-响应处理周期。但是,Route DSL 需要 return 在 get 块内的有效响应(或 Future),因此这种方法不起作用。

其实你的第一种方法还不错。 ask 模式为您创建了一个轻量级、一次性的 actor,它等待结果(非阻塞地)以完成未来。它基本上完成了您想用 DisposableActor 复制的所有事情,并且您的主要 HttpActor 不受此压力。

如果您还想使用不同的演员,有 completeWith:

completeWith(instanceOf[String]) { complete =>
  // complete is of type String => Unit
  val props = Props(new DisposableActor(complete))
  val disposableActor = context.actorOf(props, "disposable-actor-name")

  // completeWith expects you to return unit
}

在你的 actor 中,当你有结果时调用完整的函数

class DisposableActor(complete: String => Unit) {
  override def receive = {
    case GotAllData(x) => 
      complete(x)
      context stop self // don't for get to stop the actor
  }
}