未使用轮询器和服务激活器拉取 pubsub 消息
pubsub messages not being pulled with poller and serviceactivator
我一直在尝试让 pubsub 在 spring 应用程序中工作。为了起床 运行ning,我一直在阅读 this
等教程和文档
我可以构建和启动东西,但如果我通过云控制台向测试订阅发送消息,它永远不会到达。
这就是我的代码现在的样子:
@Configuration
@Import({GcpPubSubAutoConfiguration.class})
public class PubSubConfigurator {
@Bean
public GcpProjectIdProvider projectIdProvider(){
return () -> "project-id";
}
@Bean
public CredentialsProvider credentialsProvider(){
return GoogleCredentials::getApplicationDefault;
}
@Bean
public MessageChannel inputMessageChannel() {
return new PublishSubscribeChannel();
}
@Bean
@InboundChannelAdapter(channel = "inputMessageChannel", poller = @Poller(fixedDelay = "5"))
public MessageSource<Object> pubsubAdapter(PubSubTemplate pubSubTemplate) {
PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, "tst-sandbox");
messageSource.setAckMode(AckMode.MANUAL);
messageSource.setPayloadType(String.class);
messageSource.setBlockOnPull(false);
messageSource.setMaxFetchSize(10);
//pubSubTemplate.pull("tst-sandbox", 10, true);
return messageSource;
}
// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
String payload,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
System.out.println("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
message.ack();
}
}
我的想法是,轮询器注释会每隔一段时间启动一个轮询器 运行 来检查消息并将它们发送到用服务激活器注释的方法,但显然情况并非如此,因为它从来没有命中.
有趣的是,如果我在“return messageSource”之前放置一个断点并检查 template.pull 调用的结果,消息是 returned,所以这似乎不是问题与连接本身。
我在这里错过了什么?教程和文档在这一点上没有多大帮助,因为它们都使用与上面几乎相同的教程代码......
我尝试了上述代码的变体,例如创建适配器而不是消息源,如下所示:
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
@Qualifier("inputMessageChannel") MessageChannel messageChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "tst-sandbox");
adapter.setOutputChannel(messageChannel);
adapter.setAckMode(AckMode.MANUAL);
adapter.setPayloadType(String.class);
return adapter;
}
无济于事。欢迎提出任何建议。
从头开始创建spring引导项目后发现问题(主项目正常spring)。在调试输出中注意到它自动启动服务激活器 bean 和其他一些事情,比如实际订阅它在主项目中没有做的频道。
经过快速 google 解决方案很简单,必须添加
@EnableIntegration
注释在 class 级别,消息开始传入。
我一直在尝试让 pubsub 在 spring 应用程序中工作。为了起床 运行ning,我一直在阅读 this
等教程和文档我可以构建和启动东西,但如果我通过云控制台向测试订阅发送消息,它永远不会到达。
这就是我的代码现在的样子:
@Configuration
@Import({GcpPubSubAutoConfiguration.class})
public class PubSubConfigurator {
@Bean
public GcpProjectIdProvider projectIdProvider(){
return () -> "project-id";
}
@Bean
public CredentialsProvider credentialsProvider(){
return GoogleCredentials::getApplicationDefault;
}
@Bean
public MessageChannel inputMessageChannel() {
return new PublishSubscribeChannel();
}
@Bean
@InboundChannelAdapter(channel = "inputMessageChannel", poller = @Poller(fixedDelay = "5"))
public MessageSource<Object> pubsubAdapter(PubSubTemplate pubSubTemplate) {
PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, "tst-sandbox");
messageSource.setAckMode(AckMode.MANUAL);
messageSource.setPayloadType(String.class);
messageSource.setBlockOnPull(false);
messageSource.setMaxFetchSize(10);
//pubSubTemplate.pull("tst-sandbox", 10, true);
return messageSource;
}
// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
String payload,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
System.out.println("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
message.ack();
}
}
我的想法是,轮询器注释会每隔一段时间启动一个轮询器 运行 来检查消息并将它们发送到用服务激活器注释的方法,但显然情况并非如此,因为它从来没有命中.
有趣的是,如果我在“return messageSource”之前放置一个断点并检查 template.pull 调用的结果,消息是 returned,所以这似乎不是问题与连接本身。
我在这里错过了什么?教程和文档在这一点上没有多大帮助,因为它们都使用与上面几乎相同的教程代码......
我尝试了上述代码的变体,例如创建适配器而不是消息源,如下所示:
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
@Qualifier("inputMessageChannel") MessageChannel messageChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "tst-sandbox");
adapter.setOutputChannel(messageChannel);
adapter.setAckMode(AckMode.MANUAL);
adapter.setPayloadType(String.class);
return adapter;
}
无济于事。欢迎提出任何建议。
从头开始创建spring引导项目后发现问题(主项目正常spring)。在调试输出中注意到它自动启动服务激活器 bean 和其他一些事情,比如实际订阅它在主项目中没有做的频道。
经过快速 google 解决方案很简单,必须添加
@EnableIntegration
注释在 class 级别,消息开始传入。