Http Websocket 作为 Akka 流源
Http Websocket as Akka Stream Source
我想使用 akka 流在 websocket 上收听。也就是说,我只想把它当作 Source
.
但是,所有 official examples 都将 websocket 连接视为 Flow
。
我目前的方法是结合使用 websocketClientFlow
和 Source.maybe
。当没有新的 Message
被发送到流中时,这最终会导致上游由于 TcpIdleTimeoutException
而失败。
因此,我的问题是双重的:
- 有没有一种方法——我显然错过了——将 websocket 视为
Source
?
- 如果使用
Flow
是唯一的选择,如何正确处理 TcpIdleTimeoutException
?无法通过提供流监督策略来处理异常。使用 RestartSource
重新启动源也无济于事,因为源不是问题所在。
更新
所以我尝试了两种不同的方法,为方便起见将空闲超时设置为 1 秒
application.conf
akka.http.client.idle-timeout = 1s
使用 keepAlive(按照 Stefano 的建议)
Source.<Message>maybe()
.keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
.viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
{ ... }
执行此操作时,上游仍然失败并显示 TcpIdleTimeoutException
。
使用 RestartFlow
但是,我发现了 this approach,使用 RestartFlow
:
final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
Duration.apply(3, TimeUnit.SECONDS),
Duration.apply(30, TimeUnit.SECONDS),
0.2,
() -> createWebsocketFlow(system, websocketUri)
);
Source.<Message>maybe()
.viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
{ ... }
(...)
private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}
这是因为我可以将 websocket 视为源(尽管是人为的,正如 Stefano 所解释的那样)并通过在 Exception
发生时重新启动 websocketClientFlow
来保持 tcp 连接处于活动状态。
不过感觉这不是最佳解决方案。
希望我能正确理解你的问题。您在寻找 asSourceOf
吗?
path("measurements") {
entity(asSourceOf[Measurement]) { measurements =>
// measurement has type Source[Measurement, NotUsed]
...
}
}
没有。 WebSocket 是双向通道,因此 Akka-HTTP 将其建模为 Flow
。如果在您的特定情况下您只关心通道的一侧,则由您使用 Flow.fromSinkAndSource(Sink.ignore, mySource)
或 Flow.fromSinkAndSource(mySink, Source.maybe)
形成 Flow
和 "muted" 一侧, 视情况而定。
根据 documentation:
Inactive WebSocket connections will be dropped according to the
idle-timeout settings. In case you need to keep inactive connections
alive, you can either tweak your idle-timeout or inject ‘keep-alive’
messages regularly.
有一个用于注入保持活动消息的临时组合器,请参见下面的示例和此 Akka cookbook recipe。注意:这应该发生在客户端。
src.keepAlive(1.second, () => TextMessage.Strict("ping"))
我想使用 akka 流在 websocket 上收听。也就是说,我只想把它当作 Source
.
但是,所有 official examples 都将 websocket 连接视为 Flow
。
我目前的方法是结合使用 websocketClientFlow
和 Source.maybe
。当没有新的 Message
被发送到流中时,这最终会导致上游由于 TcpIdleTimeoutException
而失败。
因此,我的问题是双重的:
- 有没有一种方法——我显然错过了——将 websocket 视为
Source
? - 如果使用
Flow
是唯一的选择,如何正确处理TcpIdleTimeoutException
?无法通过提供流监督策略来处理异常。使用RestartSource
重新启动源也无济于事,因为源不是问题所在。
更新
所以我尝试了两种不同的方法,为方便起见将空闲超时设置为 1 秒
application.conf
akka.http.client.idle-timeout = 1s
使用 keepAlive(按照 Stefano 的建议)
Source.<Message>maybe()
.keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
.viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
{ ... }
执行此操作时,上游仍然失败并显示 TcpIdleTimeoutException
。
使用 RestartFlow
但是,我发现了 this approach,使用 RestartFlow
:
final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
Duration.apply(3, TimeUnit.SECONDS),
Duration.apply(30, TimeUnit.SECONDS),
0.2,
() -> createWebsocketFlow(system, websocketUri)
);
Source.<Message>maybe()
.viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
{ ... }
(...)
private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}
这是因为我可以将 websocket 视为源(尽管是人为的,正如 Stefano 所解释的那样)并通过在 Exception
发生时重新启动 websocketClientFlow
来保持 tcp 连接处于活动状态。
不过感觉这不是最佳解决方案。
希望我能正确理解你的问题。您在寻找 asSourceOf
吗?
path("measurements") {
entity(asSourceOf[Measurement]) { measurements =>
// measurement has type Source[Measurement, NotUsed]
...
}
}
没有。 WebSocket 是双向通道,因此 Akka-HTTP 将其建模为
Flow
。如果在您的特定情况下您只关心通道的一侧,则由您使用Flow.fromSinkAndSource(Sink.ignore, mySource)
或Flow.fromSinkAndSource(mySink, Source.maybe)
形成Flow
和 "muted" 一侧, 视情况而定。根据 documentation:
Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.
有一个用于注入保持活动消息的临时组合器,请参见下面的示例和此 Akka cookbook recipe。注意:这应该发生在客户端。
src.keepAlive(1.second, () => TextMessage.Strict("ping"))