Akka HTTP 客户端 websocket 意外关闭
Akka HTTP client side websocket closes unexpectedly
我有一个 websocket 端点,它每秒向客户端发送一条文本消息。客户端从不向服务器发送任何消息。
使用下面的 JS 代码,它按预期工作,它每秒不断注销消息:
var ws = new WebSocket("ws://url_of_my_endpoint");
ws.onmessage = (message) => console.log(message.data);
我想在 Scala 中使用 Akka HTTP 创建一个类似的消费者。
我根据 official docs.
创建了以下代码
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val url = "ws://url_of_my_endpoint"
val outgoing: Source[Message, NotUsed] = Source.empty
val webSocketFlow =
Http().webSocketClientFlow(WebSocketRequest(url))
val printSink: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("message received: " + message.text)
case _ => println("some other message")
}
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(printSink)(Keep.both)
.run()
val connected = upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
connected.onComplete(_ => println("Connection established."))
closed.foreach(_ => println("Connection closed."))
问题是连接在几秒钟后关闭。有时 1 秒后,有时 3-4 秒后。 JS客户端工作正常,所以我认为问题不在服务器上。
代码中有什么问题?应该怎么改,所以它告诉我哪里出了问题?
Note
Inactive WebSocket connections will be dropped according to the idle-timeout settings
. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.
问题是您没有通过流发送任何消息,因此非活动连接已关闭:
val outgoing: Source[Message, NotUsed] = Source.empty
试试下面这样的方法,每秒发送一个随机 TextMessage
:
import scala.concurrent.duration._
val outgoing: Source[Message, NotUsed] =
Source
.fromIterator(() => Iterator.continually(TextMessage(scala.util.Random.nextInt().toString)))
.throttle(1, 1 second)
或者,调整上述空闲超时设置或配置 automatic keep-alive support:
This is supported in a transparent way via configuration by setting the: akka.http.client.websocket.periodic-keep-alive-max-idle = 1 second
to a specified max idle timeout. The keep alive triggers when no other messages are in-flight during the such configured period. Akka HTTP will then automatically send a Ping
frame for each of such idle intervals.
By default, the automatic keep-alive feature is disabled.
自从您查看文档以来,文档可能已经更改,因为现在有一个部分可以处理您遇到的问题:
https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#half-closed-websockets
说明:
The Akka HTTP WebSocket API does not support half-closed connections which means that if either stream completes the entire connection is closed (after a “Closing Handshake” has been exchanged or a timeout of 3 seconds has passed). This may lead to unexpected behavior, for example if we are trying to only consume messages coming from the server
所以行
val outgoing: Source[Message, NotUsed] = Source.empty
导致了问题。并且可以通过以下永远不会完成的行来修复(除非您完成链接到 Source.maybe
的 Promise
)
val outgoing = Source.empty.concatMat(Source.maybe[Message])(Keep.right)
我自己 运行 遇到了这个问题,发现行为非常混乱。
我有一个 websocket 端点,它每秒向客户端发送一条文本消息。客户端从不向服务器发送任何消息。
使用下面的 JS 代码,它按预期工作,它每秒不断注销消息:
var ws = new WebSocket("ws://url_of_my_endpoint");
ws.onmessage = (message) => console.log(message.data);
我想在 Scala 中使用 Akka HTTP 创建一个类似的消费者。 我根据 official docs.
创建了以下代码implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val url = "ws://url_of_my_endpoint"
val outgoing: Source[Message, NotUsed] = Source.empty
val webSocketFlow =
Http().webSocketClientFlow(WebSocketRequest(url))
val printSink: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("message received: " + message.text)
case _ => println("some other message")
}
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(printSink)(Keep.both)
.run()
val connected = upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
connected.onComplete(_ => println("Connection established."))
closed.foreach(_ => println("Connection closed."))
问题是连接在几秒钟后关闭。有时 1 秒后,有时 3-4 秒后。 JS客户端工作正常,所以我认为问题不在服务器上。
代码中有什么问题?应该怎么改,所以它告诉我哪里出了问题?
Note
Inactive WebSocket connections will be dropped according to the
idle-timeout settings
. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.
问题是您没有通过流发送任何消息,因此非活动连接已关闭:
val outgoing: Source[Message, NotUsed] = Source.empty
试试下面这样的方法,每秒发送一个随机 TextMessage
:
import scala.concurrent.duration._
val outgoing: Source[Message, NotUsed] =
Source
.fromIterator(() => Iterator.continually(TextMessage(scala.util.Random.nextInt().toString)))
.throttle(1, 1 second)
或者,调整上述空闲超时设置或配置 automatic keep-alive support:
This is supported in a transparent way via configuration by setting the:
akka.http.client.websocket.periodic-keep-alive-max-idle = 1 second
to a specified max idle timeout. The keep alive triggers when no other messages are in-flight during the such configured period. Akka HTTP will then automatically send aPing
frame for each of such idle intervals.By default, the automatic keep-alive feature is disabled.
自从您查看文档以来,文档可能已经更改,因为现在有一个部分可以处理您遇到的问题: https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#half-closed-websockets
说明:
The Akka HTTP WebSocket API does not support half-closed connections which means that if either stream completes the entire connection is closed (after a “Closing Handshake” has been exchanged or a timeout of 3 seconds has passed). This may lead to unexpected behavior, for example if we are trying to only consume messages coming from the server
所以行
val outgoing: Source[Message, NotUsed] = Source.empty
导致了问题。并且可以通过以下永远不会完成的行来修复(除非您完成链接到 Source.maybe
的 Promise
)
val outgoing = Source.empty.concatMat(Source.maybe[Message])(Keep.right)
我自己 运行 遇到了这个问题,发现行为非常混乱。