Webflux websocketclient,如何在同一会话中发送多个请求[设计客户端库]
Webflux websocketclient, How to send multiple requests in same session[design client library]
TL;DR;
我们正在尝试使用 spring webflux WebSocket 实现来设计 WebSocket 服务器。服务器具有通常的 HTTP 服务器操作,例如create/fetch/update/fetchall
。使用 WebSockets 我们试图公开一个端点,以便客户端可以利用单个连接进行所有类型的操作,因为 WebSockets 就是为此目的而设计的。 webflux 和 WebSockets 的设计是否正确?
长版
我们正在启动一个项目,该项目将使用来自 spring-webflux
的反应式网络套接字。我们需要构建一个响应式客户端库,消费者可以使用它来连接到服务器。
在服务器上, 我们收到一个请求,读取一条消息,保存它并 return 一个静态响应:
public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<WebSocketMessage> response = webSocketSession.receive()
.map(WebSocketMessage::retain)
.concatMap(webSocketMessage -> Mono.just(webSocketMessage)
.map(parseBinaryToEvent) //logic to get domain object
.flatMap(e -> service.save(e))
.thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
);
return webSocketSession.send(response);
}
在客户端,我们想在有人调用save
方法时进行调用并且return来自server
的响应。
public Mono<String> save(Event message) {
new ReactorNettyWebSocketClient().execute(uri, session -> {
session
.send(Mono.just(session.binaryMessage(formatEventToMessage)))
.then(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println).then()); //how to return this to client
});
return null;
}
我们不确定如何着手设计它。理想情况下,我们认为应该有
1) client.execute
应该只调用一次并以某种方式保持 session
。在后续调用中应使用相同的会话发送数据。
2) 如何return我们在session.receive
中得到的服务器的响应?
3) 如果 fetch
在 session.receive
中响应很大(不仅仅是静态字符串,而是事件列表)怎么办?
我们正在做一些研究,但我们无法在线找到 webflux-websocket-client documentation/implementation 的合适资源。关于如何前进的任何指示。
不确定这种情况是否是您的问题??
我看到您正在发送静态通量响应(这是一个可关闭的流)
您需要一个打开的流来向该会话发送消息,例如您可以创建一个处理器
public class SocketMessageComponent {
private DirectProcessor<String> emitterProcessor;
private Flux<String> subscriber;
public SocketMessageComponent() {
emitterProcessor = DirectProcessor.create();
subscriber = emitterProcessor.share();
}
public Flux<String> getSubscriber() {
return subscriber;
}
public void sendMessage(String mesage) {
emitterProcessor.onNext(mesage);
}
}
然后你可以发送
public Mono<Void> handle(WebSocketSession webSocketSession) {
this.webSocketSession = webSocketSession;
return webSocketSession.send(socketMessageComponent.getSubscriber()
.map(webSocketSession::textMessage))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText).log());
}
请!使用 RSocket!
这是绝对正确的设计,值得节省资源并为所有可能的操作为每个客户端使用一个连接。
但是,不要实现轮子并使用为您提供所有这些类型的通信的协议。
- RSocket 有一个请求-响应 模型,它允许您进行当今最常见的客户端-服务器交互。
- RSocket 有一个 request-stream 通信模型,因此您可以满足您的所有需求和 return 事件流异步重用相同的连接。 RSocket 将逻辑流映射到物理连接并返回,因此您不会感到自己这样做的痛苦。
- RSocket 有更多的交互模型,例如
fire-and-forget 和 stream-stream 这在以下情况下可能很有用
以两种方式发送数据流。
如何在Spring
中使用RSocket
其中一个选项是使用 RSocket-Java 实现 RSocket 协议。 RSocket-Java 建立在 Project Reactor 之上,因此它自然适合 Spring WebFlux 生态系统。
遗憾的是,没有与 Spring 生态系统的特色集成。幸运的是,我花了几个小时提供了一个简单的 RSocket Spring Boot Starter,它将 Spring WebFlux 与 RSocket 集成在一起,并公开了 WebSocket RSocket 服务器和 WebFlux Http 服务器。
为什么 RSocket 是更好的方法?
基本上,RSocket 隐藏了自己实现相同方法的复杂性。使用 RSocket,我们不必关心作为自定义协议和 Java 中的实现的交互模型定义。 RSocket 为我们将数据传送到特定的逻辑通道。它提供了一个内置客户端,可以将消息发送到同一个 WS 连接,因此我们不必为此发明自定义实现。
RSocket-RPC
让它变得更好
由于RSocket只是一个协议,它不提供任何消息格式,所以这个挑战是针对业务逻辑的。但是,有一个 RSocket-RPC 项目提供协议缓冲区作为消息格式,并重用与 GRPC 相同的代码生成技术。因此,使用 RSocket-RPC 我们可以轻松地为客户端和服务器构建一个 API,而无需关心传输和协议抽象。
相同的 RSocket Spring 引导集成也提供了 example RSocket-RPC 用法。
好吧,它没有说服我,我仍然想要一个自定义的 WebSocket 服务器
所以,为了这个目的,你必须自己实现这个地狱。我之前已经做过一次,但我不能指出那个项目,因为它是一个企业项目。
不过,我可以分享一些代码示例,它们可以帮助您构建合适的客户端和服务器。
服务器端
处理程序和打开逻辑订阅者映射
必须考虑的第一点是一个物理连接中的所有逻辑流都应该存储在某个地方:
class MyWebSocketRouter implements WebSocketHandler {
final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;
@Override
public Mono<Void> handle(WebSocketSession session) {
final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
...
}
}
上面的示例中有两张地图。第一个是您的路由映射,它允许您根据传入的消息参数等来识别路由。第二个是为请求流用例创建的(在我的例子中它是活动订阅的地图),所以你可以发送一个消息框架来创建一个订阅,或者为你订阅一个特定的操作并保持该订阅,所以一旦取消订阅执行操作,如果存在订阅,您将被取消订阅。
使用处理器进行消息多路复用
为了从所有逻辑流发回消息,您必须将消息多路复用到一个流。例如,使用 Reactor,您可以使用 UnicastProcessor
:
@Override
public Mono<Void> handle(WebSocketSession session) {
final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
...
return Mono
.subscriberContext()
.flatMap(context -> Flux.merge(
session
.receive()
...
.cast(ActionMessage.class)
.publishOn(Schedulers.parallel())
.doOnNext(am -> {
switch (am.type) {
case CREATE:
case UPDATE:
case CANCEL: {
...
}
case SUBSCRIBE: {
Flux<ResponseMessage<?>> flux = Flux
.from(
channelsMapping.get(am.getChannelId())
.get(ActionMessage.Type.SUBSCRIBE)
.handle(am) // returns Publisher<>
);
if (flux != null) {
channelsIdsToDisposableMap.compute(
am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
(cid, disposable) -> {
...
return flux
.subscriberContext(context)
.subscribe(
funIn::onNext, // send message to a Processor manually
e -> {
funIn.onNext(
new ResponseMessage<>( // send errors as a messages to Processor here
0,
e.getMessage(),
...
ResponseMessage.Type.ERROR
)
);
}
);
}
);
}
return;
}
case UNSABSCRIBE: {
Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());
if (disposable != null) {
disposable.dispose();
}
}
}
})
.then(Mono.empty()),
funIn
...
.map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
.as(session::send)
).then()
);
}
正如我们从上面的示例中看到的,那里有一堆东西:
- 消息应包含路线信息
- 消息应包含与其相关的唯一流 ID。
- 用于消息多路复用的单独处理器,其中错误也应该是一条消息
- 每个频道都应该存储在某个地方,在这种情况下,我们有一个简单的用例,其中每条消息都可以提供
Flux
条消息或只是 Mono
(如果是单声道,它可以在服务器端实现更简单,因此您不必保留唯一的流 ID)。
- 此示例不包括消息编码解码,所以这个挑战就留给你了。
客户端
客户端也不简单:
处理会话
为了处理连接,我们必须分配两个处理器,以便进一步使用它们来多路复用和多路分解消息:
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
(session) -> {
return Flux.merge(
session.receive()
.subscribeWith(incoming)
.then(Mono.empty()),
session.send(outgoing)
).then();
}
将所有逻辑流保存在某处
所有创建的流,无论是 Mono
还是 Flux
都应该存储在某个地方,以便我们能够区分与哪个流消息相关:
Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;
自MonoSink以来,我们必须保留两个映射,而FluxSink没有相同的父接口。
消息路由
在上面的示例中,我们只考虑了客户端的初始部分。现在我们要建立一个消息路由机制:
...
.subscribeWith(incoming)
.doOnNext(message -> {
if (monoSinkMap.containsKey(message.getStreamId())) {
MonoSink sink = monoSinkMap.get(message.getStreamId());
monoSinkMap.remove(message.getStreamId());
if (message.getType() == SUCCESS) {
sink.success(message.getData());
}
else {
sink.error(message.getCause());
}
} else if (fluxSinkMap.containsKey(message.getStreamId())) {
FluxSink sink = fluxSinkMap.get(message.getStreamId());
if (message.getType() == NEXT) {
sink.next(message.getData());
}
else if (message.getType() == COMPLETE) {
fluxSinkMap.remove(message.getStreamId());
sink.next(message.getData());
sink.complete();
}
else {
fluxSinkMap.remove(message.getStreamId());
sink.error(message.getCause());
}
}
})
以上代码示例展示了我们如何路由传入消息。
多重请求
最后一部分是消息多路复用。为此,我们将涵盖可能的发件人 class impl:
class Sender {
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;
public Sender () {
// 在此处创建 websocket 连接并放置前面提到的代码
}
Mono<R> sendForMono(T data) {
//generate message with unique
return Mono.<R>create(sink -> {
monoSinksMap.put(streamId, sink);
outgoing.onNext(message); // send message to server only when subscribed to Mono
});
}
Flux<R> sendForFlux(T data) {
return Flux.<R>create(sink -> {
fluxSinksMap.put(streamId, sink);
outgoing.onNext(message); // send message to server only when subscribed to Flux
});
}
}
自定义实现总结
- 硬核
- 没有实现背压支持,所以这可能是另一个挑战
- 搬起石头砸自己的脚
外卖
- 请使用 RSocket,不要自己发明协议,这很难!!!
- 要从 Pivotal 的家伙那里了解更多关于 RSocket 的信息 - https://www.youtube.com/watch?v=WVnAbv65uCU
- 从我的一次演讲中了解更多关于 RSocket 的信息 - https://www.youtube.com/watch?v=XKMyj6arY2A
- 有一个构建在 RSocket 之上的特色框架,称为 Proteus - 您可能对此感兴趣 - https://www.netifi.com/
- 从 RSocket 协议的核心开发人员那里了解有关 Proteus 的更多信息 - https://www.google.com/url?sa=t&source=web&rct=j&url=https://m.youtube.com/watch%3Fv%3D_rqQtkIeNIQ&ved=2ahUKEwjpyLTpsLzfAhXDDiwKHUUUA8gQt9IBMAR6BAgNEB8&usg=AOvVaw0B_VdOj42gjr0YrzLLUX1E
TL;DR;
我们正在尝试使用 spring webflux WebSocket 实现来设计 WebSocket 服务器。服务器具有通常的 HTTP 服务器操作,例如create/fetch/update/fetchall
。使用 WebSockets 我们试图公开一个端点,以便客户端可以利用单个连接进行所有类型的操作,因为 WebSockets 就是为此目的而设计的。 webflux 和 WebSockets 的设计是否正确?
长版
我们正在启动一个项目,该项目将使用来自 spring-webflux
的反应式网络套接字。我们需要构建一个响应式客户端库,消费者可以使用它来连接到服务器。
在服务器上, 我们收到一个请求,读取一条消息,保存它并 return 一个静态响应:
public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<WebSocketMessage> response = webSocketSession.receive()
.map(WebSocketMessage::retain)
.concatMap(webSocketMessage -> Mono.just(webSocketMessage)
.map(parseBinaryToEvent) //logic to get domain object
.flatMap(e -> service.save(e))
.thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
);
return webSocketSession.send(response);
}
在客户端,我们想在有人调用save
方法时进行调用并且return来自server
的响应。
public Mono<String> save(Event message) {
new ReactorNettyWebSocketClient().execute(uri, session -> {
session
.send(Mono.just(session.binaryMessage(formatEventToMessage)))
.then(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println).then()); //how to return this to client
});
return null;
}
我们不确定如何着手设计它。理想情况下,我们认为应该有
1) client.execute
应该只调用一次并以某种方式保持 session
。在后续调用中应使用相同的会话发送数据。
2) 如何return我们在session.receive
中得到的服务器的响应?
3) 如果 fetch
在 session.receive
中响应很大(不仅仅是静态字符串,而是事件列表)怎么办?
我们正在做一些研究,但我们无法在线找到 webflux-websocket-client documentation/implementation 的合适资源。关于如何前进的任何指示。
不确定这种情况是否是您的问题?? 我看到您正在发送静态通量响应(这是一个可关闭的流) 您需要一个打开的流来向该会话发送消息,例如您可以创建一个处理器
public class SocketMessageComponent {
private DirectProcessor<String> emitterProcessor;
private Flux<String> subscriber;
public SocketMessageComponent() {
emitterProcessor = DirectProcessor.create();
subscriber = emitterProcessor.share();
}
public Flux<String> getSubscriber() {
return subscriber;
}
public void sendMessage(String mesage) {
emitterProcessor.onNext(mesage);
}
}
然后你可以发送
public Mono<Void> handle(WebSocketSession webSocketSession) {
this.webSocketSession = webSocketSession;
return webSocketSession.send(socketMessageComponent.getSubscriber()
.map(webSocketSession::textMessage))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText).log());
}
请!使用 RSocket!
这是绝对正确的设计,值得节省资源并为所有可能的操作为每个客户端使用一个连接。
但是,不要实现轮子并使用为您提供所有这些类型的通信的协议。
- RSocket 有一个请求-响应 模型,它允许您进行当今最常见的客户端-服务器交互。
- RSocket 有一个 request-stream 通信模型,因此您可以满足您的所有需求和 return 事件流异步重用相同的连接。 RSocket 将逻辑流映射到物理连接并返回,因此您不会感到自己这样做的痛苦。
- RSocket 有更多的交互模型,例如 fire-and-forget 和 stream-stream 这在以下情况下可能很有用 以两种方式发送数据流。
如何在Spring
中使用RSocket其中一个选项是使用 RSocket-Java 实现 RSocket 协议。 RSocket-Java 建立在 Project Reactor 之上,因此它自然适合 Spring WebFlux 生态系统。
遗憾的是,没有与 Spring 生态系统的特色集成。幸运的是,我花了几个小时提供了一个简单的 RSocket Spring Boot Starter,它将 Spring WebFlux 与 RSocket 集成在一起,并公开了 WebSocket RSocket 服务器和 WebFlux Http 服务器。
为什么 RSocket 是更好的方法?
基本上,RSocket 隐藏了自己实现相同方法的复杂性。使用 RSocket,我们不必关心作为自定义协议和 Java 中的实现的交互模型定义。 RSocket 为我们将数据传送到特定的逻辑通道。它提供了一个内置客户端,可以将消息发送到同一个 WS 连接,因此我们不必为此发明自定义实现。
RSocket-RPC
让它变得更好由于RSocket只是一个协议,它不提供任何消息格式,所以这个挑战是针对业务逻辑的。但是,有一个 RSocket-RPC 项目提供协议缓冲区作为消息格式,并重用与 GRPC 相同的代码生成技术。因此,使用 RSocket-RPC 我们可以轻松地为客户端和服务器构建一个 API,而无需关心传输和协议抽象。
相同的 RSocket Spring 引导集成也提供了 example RSocket-RPC 用法。
好吧,它没有说服我,我仍然想要一个自定义的 WebSocket 服务器
所以,为了这个目的,你必须自己实现这个地狱。我之前已经做过一次,但我不能指出那个项目,因为它是一个企业项目。 不过,我可以分享一些代码示例,它们可以帮助您构建合适的客户端和服务器。
服务器端
处理程序和打开逻辑订阅者映射
必须考虑的第一点是一个物理连接中的所有逻辑流都应该存储在某个地方:
class MyWebSocketRouter implements WebSocketHandler {
final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;
@Override
public Mono<Void> handle(WebSocketSession session) {
final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
...
}
}
上面的示例中有两张地图。第一个是您的路由映射,它允许您根据传入的消息参数等来识别路由。第二个是为请求流用例创建的(在我的例子中它是活动订阅的地图),所以你可以发送一个消息框架来创建一个订阅,或者为你订阅一个特定的操作并保持该订阅,所以一旦取消订阅执行操作,如果存在订阅,您将被取消订阅。
使用处理器进行消息多路复用
为了从所有逻辑流发回消息,您必须将消息多路复用到一个流。例如,使用 Reactor,您可以使用 UnicastProcessor
:
@Override
public Mono<Void> handle(WebSocketSession session) {
final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
...
return Mono
.subscriberContext()
.flatMap(context -> Flux.merge(
session
.receive()
...
.cast(ActionMessage.class)
.publishOn(Schedulers.parallel())
.doOnNext(am -> {
switch (am.type) {
case CREATE:
case UPDATE:
case CANCEL: {
...
}
case SUBSCRIBE: {
Flux<ResponseMessage<?>> flux = Flux
.from(
channelsMapping.get(am.getChannelId())
.get(ActionMessage.Type.SUBSCRIBE)
.handle(am) // returns Publisher<>
);
if (flux != null) {
channelsIdsToDisposableMap.compute(
am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
(cid, disposable) -> {
...
return flux
.subscriberContext(context)
.subscribe(
funIn::onNext, // send message to a Processor manually
e -> {
funIn.onNext(
new ResponseMessage<>( // send errors as a messages to Processor here
0,
e.getMessage(),
...
ResponseMessage.Type.ERROR
)
);
}
);
}
);
}
return;
}
case UNSABSCRIBE: {
Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());
if (disposable != null) {
disposable.dispose();
}
}
}
})
.then(Mono.empty()),
funIn
...
.map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
.as(session::send)
).then()
);
}
正如我们从上面的示例中看到的,那里有一堆东西:
- 消息应包含路线信息
- 消息应包含与其相关的唯一流 ID。
- 用于消息多路复用的单独处理器,其中错误也应该是一条消息
- 每个频道都应该存储在某个地方,在这种情况下,我们有一个简单的用例,其中每条消息都可以提供
Flux
条消息或只是Mono
(如果是单声道,它可以在服务器端实现更简单,因此您不必保留唯一的流 ID)。 - 此示例不包括消息编码解码,所以这个挑战就留给你了。
客户端
客户端也不简单:
处理会话
为了处理连接,我们必须分配两个处理器,以便进一步使用它们来多路复用和多路分解消息:
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
(session) -> {
return Flux.merge(
session.receive()
.subscribeWith(incoming)
.then(Mono.empty()),
session.send(outgoing)
).then();
}
将所有逻辑流保存在某处
所有创建的流,无论是 Mono
还是 Flux
都应该存储在某个地方,以便我们能够区分与哪个流消息相关:
Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;
自MonoSink以来,我们必须保留两个映射,而FluxSink没有相同的父接口。
消息路由
在上面的示例中,我们只考虑了客户端的初始部分。现在我们要建立一个消息路由机制:
...
.subscribeWith(incoming)
.doOnNext(message -> {
if (monoSinkMap.containsKey(message.getStreamId())) {
MonoSink sink = monoSinkMap.get(message.getStreamId());
monoSinkMap.remove(message.getStreamId());
if (message.getType() == SUCCESS) {
sink.success(message.getData());
}
else {
sink.error(message.getCause());
}
} else if (fluxSinkMap.containsKey(message.getStreamId())) {
FluxSink sink = fluxSinkMap.get(message.getStreamId());
if (message.getType() == NEXT) {
sink.next(message.getData());
}
else if (message.getType() == COMPLETE) {
fluxSinkMap.remove(message.getStreamId());
sink.next(message.getData());
sink.complete();
}
else {
fluxSinkMap.remove(message.getStreamId());
sink.error(message.getCause());
}
}
})
以上代码示例展示了我们如何路由传入消息。
多重请求
最后一部分是消息多路复用。为此,我们将涵盖可能的发件人 class impl:
class Sender {
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;
public Sender () {
// 在此处创建 websocket 连接并放置前面提到的代码 }
Mono<R> sendForMono(T data) {
//generate message with unique
return Mono.<R>create(sink -> {
monoSinksMap.put(streamId, sink);
outgoing.onNext(message); // send message to server only when subscribed to Mono
});
}
Flux<R> sendForFlux(T data) {
return Flux.<R>create(sink -> {
fluxSinksMap.put(streamId, sink);
outgoing.onNext(message); // send message to server only when subscribed to Flux
});
}
}
自定义实现总结
- 硬核
- 没有实现背压支持,所以这可能是另一个挑战
- 搬起石头砸自己的脚
外卖
- 请使用 RSocket,不要自己发明协议,这很难!!!
- 要从 Pivotal 的家伙那里了解更多关于 RSocket 的信息 - https://www.youtube.com/watch?v=WVnAbv65uCU
- 从我的一次演讲中了解更多关于 RSocket 的信息 - https://www.youtube.com/watch?v=XKMyj6arY2A
- 有一个构建在 RSocket 之上的特色框架,称为 Proteus - 您可能对此感兴趣 - https://www.netifi.com/
- 从 RSocket 协议的核心开发人员那里了解有关 Proteus 的更多信息 - https://www.google.com/url?sa=t&source=web&rct=j&url=https://m.youtube.com/watch%3Fv%3D_rqQtkIeNIQ&ved=2ahUKEwjpyLTpsLzfAhXDDiwKHUUUA8gQt9IBMAR6BAgNEB8&usg=AOvVaw0B_VdOj42gjr0YrzLLUX1E