当客户端关闭 Web 套接字连接时停止 Akka 流源
Stop Akka stream Source when web socket connection is closed by the client
我有一个 akka http web socket Route
,其代码类似于:
private val wsReader: Route =
路径("v1"/"data"/"ws"){
log.info("Opening websocket connecting ...")
val testSource = Source
.repeat("Hello")
.throttle(1, 1.seconds)
.map(x => {
println(x)
x
})
.map(TextMessage.Strict)
.limit(1000)
extractUpgradeToWebSocket { upgrade ⇒
complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, testSource))
}
}
一切正常(我每秒从客户端收到 1 条测试消息)。唯一的问题是,如果客户端关闭网络套接字连接,我不明白如何 stop/close Source
(testSource
)。
如果 Web 套接字已关闭,您可以看到源继续生成元素(请参阅 println
)。
如何检测客户端断开连接?
一种方法是使用 KillSwitches 来处理 testSource 关闭。
private val wsReader: Route =
path("v1" / "data" / "ws") {
logger.info("Opening websocket connecting ...")
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
val testSource =
Source
.repeat("Hello")
.throttle(1, 1.seconds)
.map(x => {
println(x)
x
})
.map(TextMessage.Strict)
.limit(1000)
.via(sharedKillSwitch.flow)
extractUpgradeToWebSocket { upgrade ⇒
val inSink = Sink.onComplete(_ => sharedKillSwitch.shutdown())
val outSource = testSource
val socket = upgrade.handleMessagesWithSinkSource(inSink, outSource)
complete(socket)
}
}
handleMessagesWithSinkSource 实现为:
/**
* The high-level interface to create a WebSocket server based on "messages".
*
* Returns a response to return in a request handler that will signal the
* low-level HTTP implementation to upgrade the connection to WebSocket and
* use the supplied inSink to consume messages received from the client and
* the supplied outSource to produce message to sent to the client.
*
* Optionally, a subprotocol out of the ones requested by the client can be chosen.
*/
def handleMessagesWithSinkSource(
inSink: Graph[SinkShape[Message], Any],
outSource: Graph[SourceShape[Message], Any],
subprotocol: Option[String] = None): HttpResponse =
handleMessages(Flow.fromSinkAndSource(inSink, outSource), subprotocol)
这意味着接收器和源是独立的,事实上,即使客户端关闭了连接的传入端,源也应该继续生成元素。不过,当客户端完全重置连接时,它应该会停止。
要在传入连接关闭后立即停止生成传出数据,您可以使用 Flow.fromSinkAndSourceCoupled
,因此:
val socket = upgrade.handleMessages(
Flow.fromSinkAndSourceCoupled(inSink, outSource)
subprotocol = None
)
我有一个 akka http web socket Route
,其代码类似于:
private val wsReader: Route = 路径("v1"/"data"/"ws"){ log.info("Opening websocket connecting ...")
val testSource = Source
.repeat("Hello")
.throttle(1, 1.seconds)
.map(x => {
println(x)
x
})
.map(TextMessage.Strict)
.limit(1000)
extractUpgradeToWebSocket { upgrade ⇒
complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, testSource))
}
}
一切正常(我每秒从客户端收到 1 条测试消息)。唯一的问题是,如果客户端关闭网络套接字连接,我不明白如何 stop/close Source
(testSource
)。
如果 Web 套接字已关闭,您可以看到源继续生成元素(请参阅 println
)。
如何检测客户端断开连接?
一种方法是使用 KillSwitches 来处理 testSource 关闭。
private val wsReader: Route =
path("v1" / "data" / "ws") {
logger.info("Opening websocket connecting ...")
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
val testSource =
Source
.repeat("Hello")
.throttle(1, 1.seconds)
.map(x => {
println(x)
x
})
.map(TextMessage.Strict)
.limit(1000)
.via(sharedKillSwitch.flow)
extractUpgradeToWebSocket { upgrade ⇒
val inSink = Sink.onComplete(_ => sharedKillSwitch.shutdown())
val outSource = testSource
val socket = upgrade.handleMessagesWithSinkSource(inSink, outSource)
complete(socket)
}
}
handleMessagesWithSinkSource 实现为:
/**
* The high-level interface to create a WebSocket server based on "messages".
*
* Returns a response to return in a request handler that will signal the
* low-level HTTP implementation to upgrade the connection to WebSocket and
* use the supplied inSink to consume messages received from the client and
* the supplied outSource to produce message to sent to the client.
*
* Optionally, a subprotocol out of the ones requested by the client can be chosen.
*/
def handleMessagesWithSinkSource(
inSink: Graph[SinkShape[Message], Any],
outSource: Graph[SourceShape[Message], Any],
subprotocol: Option[String] = None): HttpResponse =
handleMessages(Flow.fromSinkAndSource(inSink, outSource), subprotocol)
这意味着接收器和源是独立的,事实上,即使客户端关闭了连接的传入端,源也应该继续生成元素。不过,当客户端完全重置连接时,它应该会停止。
要在传入连接关闭后立即停止生成传出数据,您可以使用 Flow.fromSinkAndSourceCoupled
,因此:
val socket = upgrade.handleMessages(
Flow.fromSinkAndSourceCoupled(inSink, outSource)
subprotocol = None
)