用akka-http应答一个特定的客户端,也支持广播

Answer a specific client with akka-http and also support broadcast

我对使用 akka-http 库创建服务器有点迷茫。我需要建立的通讯如下:

鉴于:

这是我的 websocket 端点:

path("socket") {
  handleWebSocketMessages(listen())
}

这里是 listen() 方法:

// stores offers to broadcast to all clients
private var offers: List[TextMessage => Unit] = List()

def listen(): Flow[Message, Message, NotUsed] = {
  val inbound: Sink[Message, Any] = Sink.foreach(m => /* handle the message */) // (*)
  val outbound: Source[Message, SourceQueueWithComplete[Message]] =
    Source.queue[Message](16, OverflowStrategy.fail)

  Flow.fromSinkAndSourceMat(inbound, outbound)((_, outboundMat) => {
    offers ::= outboundMat.offer
    NotUsed
  })
}

def sendText(text: String): Unit = {
  for (connection <- offers) connection(TextMessage.Strict(text))
}

通过这种方法,我可以注册多个客户端并使用 sendText(text: String) 方法回答他们。但是,有一个大问题:在我评估了它的命令后,我如何只回答特定的客户。 (参见 (*)

[困扰我的另一件事是 offers 是一个 var,当以纯 FP 方式编程时这似乎是错误的,但如果其余的工作正常我可以接受]

编辑:

为了详细说明,我基本上需要能够实现如下所示的方法:

def onMessageReceived(m: Message, answer: TextMessage => Unit): Unit = {
  val response: TextMessage = handleMessage(m)
  answer(response)
}

但是我不知道在我的 websocket Flow 中在哪里调用这个方法。

我不确定这是否可行,但这似乎有效:

var actors: List[ActorRef] = Nil

private def wsFlow(implicit materializer: ActorMaterializer): Flow[ws.Message, ws.Message, NotUsed] = {
    val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
      .toMat(BroadcastHub.sink[String])(Keep.both)
      .run()

    actors = actor :: actors

    val wsHandler: Flow[ws.Message, ws.Message, NotUsed] =
      Flow[ws.Message]
        .merge(source)
        .map {
          case TextMessage.Strict(tm) => handleMessage(actor, tm)
          case _ => TextMessage.Strict("Ignored message!")
        }
    wsHandler
  }

  def broadcast(msg: String): Unit = {
    actors.foreach(_ ! TextMessage.Strict(msg))
  }