如何禁用 Akka WebSocket 服务器上的消息缓冲?

How to disable the buffering of messages on an Akka WebSocket server?

我有一个非常简单的 Akka WebSocket 服务器,它以每行 400 毫秒的间隔将行从文件推送到连接的客户端。一切正常,除了网络服务器似乎在广播消息之前将消息缓冲了大约一分钟。

因此,当客户端连接时,我在服务器端看到每 400 毫秒读取一行并将其推送到 Sink,但在客户端一分钟内我什么也得不到,然后是一阵大约 150 条消息(相当于一分钟的消息)。

是否有我忽略的设置?

object WebsocketServer extends App {
  implicit val actorSystem = ActorSystem("WebsocketServer")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher

  val file = Paths.get("websocket-server/src/main/resources/EURUSD.txt")
  val fileSource =
    FileIO.fromPath(file)
      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))

  val delayedSource: Source[Strict, Future[IOResult]] =
    fileSource
      .map { line =>
        Thread.sleep(400)
        println(line.utf8String)
        TextMessage(line.utf8String)
      }

  def route = path("") {
    extractUpgradeToWebSocket { upgrade =>
      complete(upgrade.handleMessagesWithSinkSource(
        Sink.ignore,
        delayedSource)
      )
    }
  }

  val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

  bindingFuture.onComplete {
    case Success(binding) ⇒
      println(s"Server is listening on ws://localhost:8080")
    case Failure(e) ⇒
      println(s"Binding failed with ${e.getMessage}")
      actorSystem.terminate()
  }
}

所以 Thread.sleep(400) 的方法是错误的。我应该在来源上使用 .throttle 机制:

val delayedSource: Source[Strict, Future[IOResult]] =
    fileSource
      .throttle(elements = 1, per = 400.millis)
      .map { line =>
        println(line.utf8String)
        TextMessage(line.utf8String)
      }

这解决了问题。