无法通过 SSE 向所有连接发送通知
Unable to send notification to all connection through SSE
我正在尝试通过服务器发送的事件将事件通知所有用户,但不知道为什么,但它不起作用。如果我使用队列,我可以通知单个用户,但如果我更改配置以使用主题,它什么也不做。你在下面看到我正在使用队列并在 jmsTopicTemplate 中注释掉 template.setPubSubDomain(true) 然后它发送给单个用户
@Bean
public Publisher<Message<LocationData>> jmsReactiveSource(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(JMSConfig.Location_TOPIC).
jmsMessageConverter(messageConverter()))
//.channel(new PublishSubscribeChannel(executor()))
// .channel(MessageChannels.flux())
.channel(MessageChannels.queue())
.toReactivePublisher();
}
您可以在 https://github.com/haiderali22/spring-tracking-jms-sse-mongo-app
查看代码
好吧,不幸的是,我已经检查了你的项目,它仍然足够大,可以正常消化。
但是我从集成的角度来看应该是这样的:
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(JMSConfig.Location_TOPIC).
jmsMessageConverter(messageConverter())
.autoStartup(false)
.id("jmsMessageDrivenChannelAdapter"))
.toReactivePublisher();
toReactivePublisher()
已经自己注入了频道。中间不需要任何其他。 autoStartup(false)
用于延迟订阅最终 Flux
。这样你就不会从 JMS 中提取消息,直到最后 Flux
发生订阅。
您稍后将在 LocationService
中使用的 .id("jmsMessageDrivenChannelAdapter"))
为此:
public Flux<LocationData> watch() {
return Flux.from(jmsReactiveSource)
.map(Message::getPayload)
.doOnSubscribe(s -> jmsMessageDrivenChannelAdapter.start());
}
这样,在 Flux
.
中发生真正的订阅之前,您不会开始从 JMS 中提取数据
JMS 主题与此 SSE 主题无关。
如果你能让你的项目更简单,我会再试一次。
不过我对龙目岛不熟悉...
更新
使用当前的解决方案,您需要这样:
.channel(MessageChannels.flux())
.toReactivePublisher();
在消息驱动之后使用常规 .toReactivePublisher()
的问题,我们只能为最终发布者获得一个订阅者。
要使其成为 pub-sub,您肯定需要在两者之间放置一个 FluxMessageChannel
。这样,您所有的 SEE 都将发送给所有 JavaScript 订阅者。
我正在尝试通过服务器发送的事件将事件通知所有用户,但不知道为什么,但它不起作用。如果我使用队列,我可以通知单个用户,但如果我更改配置以使用主题,它什么也不做。你在下面看到我正在使用队列并在 jmsTopicTemplate 中注释掉 template.setPubSubDomain(true) 然后它发送给单个用户
@Bean
public Publisher<Message<LocationData>> jmsReactiveSource(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(JMSConfig.Location_TOPIC).
jmsMessageConverter(messageConverter()))
//.channel(new PublishSubscribeChannel(executor()))
// .channel(MessageChannels.flux())
.channel(MessageChannels.queue())
.toReactivePublisher();
}
您可以在 https://github.com/haiderali22/spring-tracking-jms-sse-mongo-app
查看代码好吧,不幸的是,我已经检查了你的项目,它仍然足够大,可以正常消化。 但是我从集成的角度来看应该是这样的:
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(JMSConfig.Location_TOPIC).
jmsMessageConverter(messageConverter())
.autoStartup(false)
.id("jmsMessageDrivenChannelAdapter"))
.toReactivePublisher();
toReactivePublisher()
已经自己注入了频道。中间不需要任何其他。 autoStartup(false)
用于延迟订阅最终 Flux
。这样你就不会从 JMS 中提取消息,直到最后 Flux
发生订阅。
您稍后将在 LocationService
中使用的 .id("jmsMessageDrivenChannelAdapter"))
为此:
public Flux<LocationData> watch() {
return Flux.from(jmsReactiveSource)
.map(Message::getPayload)
.doOnSubscribe(s -> jmsMessageDrivenChannelAdapter.start());
}
这样,在 Flux
.
JMS 主题与此 SSE 主题无关。
如果你能让你的项目更简单,我会再试一次。 不过我对龙目岛不熟悉...
更新
使用当前的解决方案,您需要这样:
.channel(MessageChannels.flux())
.toReactivePublisher();
在消息驱动之后使用常规 .toReactivePublisher()
的问题,我们只能为最终发布者获得一个订阅者。
要使其成为 pub-sub,您肯定需要在两者之间放置一个 FluxMessageChannel
。这样,您所有的 SEE 都将发送给所有 JavaScript 订阅者。