Akka HTTP 客户端 websocket 意外关闭

Akka HTTP client side websocket closes unexpectedly

我有一个 websocket 端点,它每秒向客户端发送一条文本消息。客户端从不向服务器发送任何消息。

使用下面的 JS 代码,它按预期工作,它每秒不断注销消息:

var ws = new WebSocket("ws://url_of_my_endpoint");
ws.onmessage = (message) => console.log(message.data);

我想在 Scala 中使用 Akka HTTP 创建一个类似的消费者。 我根据 official docs.

创建了以下代码
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher

val url = "ws://url_of_my_endpoint"

val outgoing: Source[Message, NotUsed] = Source.empty

val webSocketFlow =
  Http().webSocketClientFlow(WebSocketRequest(url))

val printSink: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println("message received: " + message.text)
    case _  => println("some other message")
  }

val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right)
    .toMat(printSink)(Keep.both)
    .run()

val connected = upgradeResponse.map { upgrade =>
  if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
    Done
  } else {
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  }
}

connected.onComplete(_ => println("Connection established."))
closed.foreach(_ => println("Connection closed."))

问题是连接在几秒钟后关闭。有时 1 秒后,有时 3-4 秒后。 JS客户端工作正常,所以我认为问题不在服务器上。

代码中有什么问题?应该怎么改,所以它告诉我哪里出了问题?

来自documentation

Note

Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.

问题是您没有通过流发送任何消息,因此非活动连接已关闭:

val outgoing: Source[Message, NotUsed] = Source.empty

试试下面这样的方法,每秒发送一个随机 TextMessage

import scala.concurrent.duration._

val outgoing: Source[Message, NotUsed] =
  Source
    .fromIterator(() => Iterator.continually(TextMessage(scala.util.Random.nextInt().toString)))
    .throttle(1, 1 second)

或者,调整上述空闲超时设置或配置 automatic keep-alive support:

This is supported in a transparent way via configuration by setting the: akka.http.client.websocket.periodic-keep-alive-max-idle = 1 second to a specified max idle timeout. The keep alive triggers when no other messages are in-flight during the such configured period. Akka HTTP will then automatically send a Ping frame for each of such idle intervals.

By default, the automatic keep-alive feature is disabled.

自从您查看文档以来,文档可能已经更改,因为现在有一个部分可以处理您遇到的问题: https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#half-closed-websockets

说明:

The Akka HTTP WebSocket API does not support half-closed connections which means that if either stream completes the entire connection is closed (after a “Closing Handshake” has been exchanged or a timeout of 3 seconds has passed). This may lead to unexpected behavior, for example if we are trying to only consume messages coming from the server

所以行

val outgoing: Source[Message, NotUsed] = Source.empty

导致了问题。并且可以通过以下永远不会完成的行来修复(除非您完成链接到 Source.maybePromise

val outgoing = Source.empty.concatMat(Source.maybe[Message])(Keep.right)

我自己 运行 遇到了这个问题,发现行为非常混乱。