Http Websocket 作为 Akka 流源

Http Websocket as Akka Stream Source

我想使用 akka 流在 websocket 上收听。也就是说,我只想把它当作 Source.

但是,所有 official examples 都将 websocket 连接视为 Flow

我目前的方法是结合使用 websocketClientFlowSource.maybe。当没有新的 Message 被发送到流中时,这最终会导致上游由于 TcpIdleTimeoutException 而失败。

因此,我的问题是双重的:

  1. 有没有一种方法——我显然错过了——将 websocket 视为 Source
  2. 如果使用 Flow 是唯一的选择,如何正确处理 TcpIdleTimeoutException?无法通过提供流监督策略来处理异常。使用 RestartSource 重新启动源也无济于事,因为源不是问题所在。

更新

所以我尝试了两种不同的方法,为方便起见将空闲超时设置为 1 秒

application.conf

akka.http.client.idle-timeout = 1s

使用 keepAlive(按照 Stefano 的建议)

Source.<Message>maybe()
    .keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
    .viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
    { ... }

执行此操作时,上游仍然失败并显示 TcpIdleTimeoutException

使用 RestartFlow

但是,我发现了 this approach,使用 RestartFlow:

final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
    Duration.apply(3, TimeUnit.SECONDS),
    Duration.apply(30, TimeUnit.SECONDS),
    0.2,
    () -> createWebsocketFlow(system, websocketUri)
);

Source.<Message>maybe()
    .viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
    { ... }

(...)

private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
    return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}

这是因为我可以将 websocket 视为源(尽管是人为的,正如 Stefano 所解释的那样)并通过在 Exception 发生时重新启动 websocketClientFlow 来保持 tcp 连接处于活动状态。

不过感觉这不是最佳解决方案。

希望我能正确理解你的问题。您在寻找 asSourceOf 吗?

path("measurements") {
  entity(asSourceOf[Measurement]) { measurements =>
    // measurement has type Source[Measurement, NotUsed]
    ...
  }
}
  1. 没有。 WebSocket 是双向通道,因此 Akka-HTTP 将其建模为 Flow。如果在您的特定情况下您只关心通道的一侧,则由您使用 Flow.fromSinkAndSource(Sink.ignore, mySource)Flow.fromSinkAndSource(mySink, Source.maybe) 形成 Flow 和 "muted" 一侧, 视情况而定。

  2. 根据 documentation:

    Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.

    有一个用于注入保持活动消息的临时组合器,请参见下面的示例和此 Akka cookbook recipe。注意:这应该发生在客户端。

    src.keepAlive(1.second, () => TextMessage.Strict("ping"))