无法通过 SSE 向所有连接发送通知

Unable to send notification to all connection through SSE

我正在尝试通过服务器发送的事件将事件通知所有用户,但不知道为什么,但它不起作用。如果我使用队列,我可以通知单个用户,但如果我更改配置以使用主题,它什么也不做。你在下面看到我正在使用队列并在 jmsTopicTemplate 中注释掉 template.setPubSubDomain(true) 然后它发送给单个用户

@Bean
    public Publisher<Message<LocationData>> jmsReactiveSource(ConnectionFactory connectionFactory) {

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                        .destination(JMSConfig.Location_TOPIC).
                        jmsMessageConverter(messageConverter()))
                //.channel(new PublishSubscribeChannel(executor()))                 
//              .channel(MessageChannels.flux())
                .channel(MessageChannels.queue())
                .toReactivePublisher();
    }

您可以在 https://github.com/haiderali22/spring-tracking-jms-sse-mongo-app

查看代码

好吧,不幸的是,我已经检查了你的项目,它仍然足够大,可以正常消化。 但是我从集成的角度来看应该是这样的:

return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(JMSConfig.Location_TOPIC).
                            jmsMessageConverter(messageConverter())
                    .autoStartup(false)
                    .id("jmsMessageDrivenChannelAdapter"))
            .toReactivePublisher();

toReactivePublisher() 已经自己注入了频道。中间不需要任何其他。 autoStartup(false) 用于延迟订阅最终 Flux。这样你就不会从 JMS 中提取消息,直到最后 Flux 发生订阅。

您稍后将在 LocationService 中使用的 .id("jmsMessageDrivenChannelAdapter")) 为此:

public Flux<LocationData> watch() {
    return Flux.from(jmsReactiveSource)
            .map(Message::getPayload)
            .doOnSubscribe(s -> jmsMessageDrivenChannelAdapter.start());
}

这样,在 Flux.

中发生真正的订阅之前,您不会开始从 JMS 中提取数据

JMS 主题与此 SSE 主题无关。

如果你能让你的项目更简单,我会再试一次。 不过我对龙目岛不熟悉...

更新

使用当前的解决方案,您需要这样:

.channel(MessageChannels.flux())
.toReactivePublisher();

在消息驱动之后使用常规 .toReactivePublisher() 的问题,我们只能为最终发布者获得一个订阅者。

要使其成为 pub-sub,您肯定需要在两者之间放置一个 FluxMessageChannel。这样,您所有的 SEE 都将发送给所有 JavaScript 订阅者。