使用 spring 云流的队列侦听器行为
Queue listener like behavior using spring cloud stream
我正在尝试使用 spring 云流供应商和消费者来实现上述场景。
- 此应用是包含生产者和消费者的单个 spring 启动应用。
- 有一个生产者和(可以)多个消费者。所有消费者都应作为客户端排队(即单个消息应仅由单个消费者接收),其他消费者接收不同的消息。
下面是javaclass
@Component
public class MultipleFunctionsApplication {
@Bean
public Consumer<String> sink1() {
return message -> {
System.out.println(new Date() + "----------->>> sink1 - Received message " + message);
};
}
@Bean
public Consumer<String> sink2() {
return message -> {
System.out.println(new Date() + "----------->>> sink2 - Received message " + message);
};
}
}
我正在尝试使用消费者组功能来实现,如下所示。
spring:
cloud:
stream:
bindings:
requester1:
destination: rss-exchange
group: requester
requester2:
destination: rss-exchange
group: requester
function:
bindings:
sink1-in-0: requester1
sink2-in-0: requester2
definition: sink1;sink2
application:
name: rss
当我启动应用程序时出现以下错误。
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'rss-exchange.requester.errors.recoverer' defined in null: Cannot register bean definition [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] for bean 'rss-exchange.requester.errors.recoverer': There is already [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] bound.
at org.springframework.beans.factory.support.DefaultListableBeanFactory.registerBeanDefinition(DefaultListableBeanFactory.java:995) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.support.GenericApplicationContext.registerBeanDefinition(GenericApplicationContext.java:330) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.beans.factory.support.BeanDefinitionReaderUtils.registerBeanDefinition(BeanDefinitionReaderUtils.java:164) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.doRegisterBean(AnnotatedBeanDefinitionReader.java:285) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.registerBean(AnnotatedBeanDefinitionReader.java:233) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotationConfigApplicationContext.registerBean(AnnotationConfigApplicationContext.java:198) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:687) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:639) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:525) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:136) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:408) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
... 24 common frames omitted
从日志中可以清楚地看出它正在尝试再次创建 'rss-exchange.requester.errors.recoverer'。在这种情况下,只有 sink1 以以下消息启动。
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}
当我添加 "allow-bean-definition-overriding: true" 时,一切正常,如下面的日志所示。
Fri Aug 27 15:03:57 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}
我不确定这样做是否正确,因为即使我正在尝试的用例正在使用重写 属性.
,我仍然收到 bean 已经存在的错误
注意 - 我开始探索流云流才几天,所以如果我问了一些愚蠢的问题,请认为我很天真。
因此问题已修复并使用您的配置进行了测试、合并,并且在当前快照 (3.2.0-SNAPSHOT) 中可用。
我正在尝试使用 spring 云流供应商和消费者来实现上述场景。
- 此应用是包含生产者和消费者的单个 spring 启动应用。
- 有一个生产者和(可以)多个消费者。所有消费者都应作为客户端排队(即单个消息应仅由单个消费者接收),其他消费者接收不同的消息。
下面是javaclass
@Component
public class MultipleFunctionsApplication {
@Bean
public Consumer<String> sink1() {
return message -> {
System.out.println(new Date() + "----------->>> sink1 - Received message " + message);
};
}
@Bean
public Consumer<String> sink2() {
return message -> {
System.out.println(new Date() + "----------->>> sink2 - Received message " + message);
};
}
}
我正在尝试使用消费者组功能来实现,如下所示。
spring:
cloud:
stream:
bindings:
requester1:
destination: rss-exchange
group: requester
requester2:
destination: rss-exchange
group: requester
function:
bindings:
sink1-in-0: requester1
sink2-in-0: requester2
definition: sink1;sink2
application:
name: rss
当我启动应用程序时出现以下错误。
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'rss-exchange.requester.errors.recoverer' defined in null: Cannot register bean definition [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] for bean 'rss-exchange.requester.errors.recoverer': There is already [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] bound.
at org.springframework.beans.factory.support.DefaultListableBeanFactory.registerBeanDefinition(DefaultListableBeanFactory.java:995) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.support.GenericApplicationContext.registerBeanDefinition(GenericApplicationContext.java:330) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.beans.factory.support.BeanDefinitionReaderUtils.registerBeanDefinition(BeanDefinitionReaderUtils.java:164) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.doRegisterBean(AnnotatedBeanDefinitionReader.java:285) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.registerBean(AnnotatedBeanDefinitionReader.java:233) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotationConfigApplicationContext.registerBean(AnnotationConfigApplicationContext.java:198) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:687) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:639) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:525) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:136) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:408) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
... 24 common frames omitted
从日志中可以清楚地看出它正在尝试再次创建 'rss-exchange.requester.errors.recoverer'。在这种情况下,只有 sink1 以以下消息启动。
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}
当我添加 "allow-bean-definition-overriding: true" 时,一切正常,如下面的日志所示。
Fri Aug 27 15:03:57 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}
我不确定这样做是否正确,因为即使我正在尝试的用例正在使用重写 属性.
,我仍然收到 bean 已经存在的错误注意 - 我开始探索流云流才几天,所以如果我问了一些愚蠢的问题,请认为我很天真。
因此问题已修复并使用您的配置进行了测试、合并,并且在当前快照 (3.2.0-SNAPSHOT) 中可用。