当客户端关闭 Web 套接字连接时停止 Akka 流源

Stop Akka stream Source when web socket connection is closed by the client

我有一个 akka http web socket Route,其代码类似于:

private val wsReader: Route = 路径("v1"/"data"/"ws"){ log.info("Opening websocket connecting ...")

  val testSource = Source
    .repeat("Hello")
    .throttle(1, 1.seconds)
    .map(x => {
      println(x)
      x
    })
    .map(TextMessage.Strict)
    .limit(1000)

  extractUpgradeToWebSocket { upgrade ⇒
    complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, testSource))
  }
}

一切正常(我每秒从客户端收到 1 条测试消息)。唯一的问题是,如果客户端关闭网络套接字连接,我不明白如何 stop/close Source (testSource)。

如果 Web 套接字已关闭,您可以看到源继续生成元素(请参阅 println)。

如何检测客户端断开连接?

一种方法是使用 KillSwitches 来处理 testSource 关闭。

private val wsReader: Route =
path("v1" / "data" / "ws") {
  logger.info("Opening websocket connecting ...")

  val sharedKillSwitch = KillSwitches.shared("my-kill-switch")

  val testSource =
    Source
     .repeat("Hello")
     .throttle(1, 1.seconds)
     .map(x => {
       println(x)
       x
     })
    .map(TextMessage.Strict)
    .limit(1000)
    .via(sharedKillSwitch.flow)

  extractUpgradeToWebSocket { upgrade ⇒
    val inSink = Sink.onComplete(_ => sharedKillSwitch.shutdown())
    val outSource = testSource
    val socket = upgrade.handleMessagesWithSinkSource(inSink, outSource)

    complete(socket)
  }
}

handleMessagesWithSinkSource 实现为:

/**
 * The high-level interface to create a WebSocket server based on "messages".
 *
 * Returns a response to return in a request handler that will signal the
 * low-level HTTP implementation to upgrade the connection to WebSocket and
 * use the supplied inSink to consume messages received from the client and
 * the supplied outSource to produce message to sent to the client.
 *
 * Optionally, a subprotocol out of the ones requested by the client can be chosen.
 */
def handleMessagesWithSinkSource(
  inSink:      Graph[SinkShape[Message], Any],
  outSource:   Graph[SourceShape[Message], Any],
  subprotocol: Option[String]                   = None): HttpResponse =

  handleMessages(Flow.fromSinkAndSource(inSink, outSource), subprotocol)

这意味着接收器和源是独立的,事实上,即使客户端关闭了连接的传入端,源也应该继续生成元素。不过,当客户端完全重置连接时,它应该会停止。

要在传入连接关闭后立即停止生成传出数据,您可以使用 Flow.fromSinkAndSourceCoupled,因此:

val socket = upgrade.handleMessages(
  Flow.fromSinkAndSourceCoupled(inSink, outSource)
  subprotocol = None
)