Spring 5 个反应性 websockets:客户端没有从热流中接收到相同的数据

Spring 5 reactive websockets: Clients not receiving same data from hot stream

我的 WebSocketHandler 实现中有这个:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(
       session.receive()
              .flatMap(webSocketMessage -> {
                  int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

                  Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
                  var publisher = flux
                      .<String>handle((o, sink) -> {
                         try {
                            sink.next(objectMapper.writeValueAsString(o));
                         } catch (JsonProcessingException e) {
                            e.printStackTrace();                               
                         }
                      })
                      .map(session::textMessage);

                  return publisher;
              })
    );
}

目前在服务中测试生成Flux<EfficiencyData>如下:

public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) {
    return Flux.interval(Duration.ofSeconds(1))
               .map(aLong -> {
                   longAdder.increment();
                   return new EfficiencyData(new MachineSpeed(
                           RotationSpeed.ofRpm(longAdder.intValue()),
                           RotationSpeed.ofRpm(0),
                           RotationSpeed.ofRpm(400)));
               }).publish().autoConnect();
}

我正在使用 publish().autoConnect() 使其成为热门流。我创建了一个单元测试,它启动了 2 个线程,这些线程在返回的 Flux:

上执行此操作
flux.log().handle((s, sink) -> {
            LOGGER.info("{}", s.getMachineSpeed().getCurrent());
        }).subscribe();

在这种情况下,我看到两个线程每秒打印出相同的值。

但是,当我打开 2 个浏览器选项卡时,我在两个网页中看到的值不同。连接的 websocket 客户端越多,值就越不同(因此原始 Flux 中的每个值似乎都发送到不同的客户端,而不是发送到所有客户端)。

感谢 Brian Clozel on twitter 解决了这个问题。

问题是对于每个连接的 websocket 客户端,我调用 service.subscribeToEfficiencyData(id) 方法,每次调用时 returns 一个 new Flux。因此,当然,那些独立的 Flux 不会在不同的 websocket 客户端之间共享。

为了解决这个问题,我在构造函数中创建了 Flux 实例,或者在我的服务的 PostConstruct 方法中创建了 subscribeToEfficiencyData returns 相同的 Flux 实例时间.

请注意,.publish().autoConnect() 在 Flux 上仍然很重要,因为没有那个 websocket 客户端将再次看到不同的值!