如何禁用 Akka WebSocket 服务器上的消息缓冲?
How to disable the buffering of messages on an Akka WebSocket server?
我有一个非常简单的 Akka WebSocket 服务器,它以每行 400 毫秒的间隔将行从文件推送到连接的客户端。一切正常,除了网络服务器似乎在广播消息之前将消息缓冲了大约一分钟。
因此,当客户端连接时,我在服务器端看到每 400 毫秒读取一行并将其推送到 Sink
,但在客户端一分钟内我什么也得不到,然后是一阵大约 150 条消息(相当于一分钟的消息)。
是否有我忽略的设置?
object WebsocketServer extends App {
implicit val actorSystem = ActorSystem("WebsocketServer")
implicit val materializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
val file = Paths.get("websocket-server/src/main/resources/EURUSD.txt")
val fileSource =
FileIO.fromPath(file)
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
val delayedSource: Source[Strict, Future[IOResult]] =
fileSource
.map { line =>
Thread.sleep(400)
println(line.utf8String)
TextMessage(line.utf8String)
}
def route = path("") {
extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(
Sink.ignore,
delayedSource)
)
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
bindingFuture.onComplete {
case Success(binding) ⇒
println(s"Server is listening on ws://localhost:8080")
case Failure(e) ⇒
println(s"Binding failed with ${e.getMessage}")
actorSystem.terminate()
}
}
所以 Thread.sleep(400)
的方法是错误的。我应该在来源上使用 .throttle
机制:
val delayedSource: Source[Strict, Future[IOResult]] =
fileSource
.throttle(elements = 1, per = 400.millis)
.map { line =>
println(line.utf8String)
TextMessage(line.utf8String)
}
这解决了问题。
我有一个非常简单的 Akka WebSocket 服务器,它以每行 400 毫秒的间隔将行从文件推送到连接的客户端。一切正常,除了网络服务器似乎在广播消息之前将消息缓冲了大约一分钟。
因此,当客户端连接时,我在服务器端看到每 400 毫秒读取一行并将其推送到 Sink
,但在客户端一分钟内我什么也得不到,然后是一阵大约 150 条消息(相当于一分钟的消息)。
是否有我忽略的设置?
object WebsocketServer extends App {
implicit val actorSystem = ActorSystem("WebsocketServer")
implicit val materializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
val file = Paths.get("websocket-server/src/main/resources/EURUSD.txt")
val fileSource =
FileIO.fromPath(file)
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
val delayedSource: Source[Strict, Future[IOResult]] =
fileSource
.map { line =>
Thread.sleep(400)
println(line.utf8String)
TextMessage(line.utf8String)
}
def route = path("") {
extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(
Sink.ignore,
delayedSource)
)
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
bindingFuture.onComplete {
case Success(binding) ⇒
println(s"Server is listening on ws://localhost:8080")
case Failure(e) ⇒
println(s"Binding failed with ${e.getMessage}")
actorSystem.terminate()
}
}
所以 Thread.sleep(400)
的方法是错误的。我应该在来源上使用 .throttle
机制:
val delayedSource: Source[Strict, Future[IOResult]] =
fileSource
.throttle(elements = 1, per = 400.millis)
.map { line =>
println(line.utf8String)
TextMessage(line.utf8String)
}
这解决了问题。