Spring Cloud Stream:@StreamListener 处理消息两次
Spring Cloud Stream: @StreamListener processing messages twice
我正在使用 Spring Cloud Stream (Edgware.SR5) 和 Spring Boot (1.5.10.RELEASE)。我的@StreamListener 对收到的每条消息处理两次。
示例的思路是在队列中发布一条消息并对其进行处理。
服务:
@EnableBinding(ExampleBindings.class)
@Service
public class ExampleService {
@Publisher(channel = ExampleBindings.OUTPUT)
public String queue(String message){
return message;
}
@StreamListener(ExampleBindings.INPUT)
public void dequeue(String message){
System.out.println("New message: " + message);
}
}
绑定:
public interface ExampleBindings {
String INPUT = "input1";
String OUTPUT = "output1";
@Input(ExampleBindings.INPUT)
SubscribableChannel input();
@Output(ExampleBindings.OUTPUT)
MessageChannel output();
}
application.properties:
spring.cloud.stream.default.group=group1
spring.cloud.stream.default.binder=binder1
spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1
spring.cloud.stream.binders.binder1.type=rabbit
spring.cloud.stream.binders.binder1.environment.spring.rabbitmq.host=localhost
配置(用于在测试中注入代理服务):
@Configuration
public class ExampleConfig {
@Bean
public PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor(){
PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor =
new PublisherAnnotationBeanPostProcessor();
publisherAnnotationBeanPostProcessor.setProxyTargetClass(true);
return publisherAnnotationBeanPostProcessor;
}
}
测试:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ExampleServiceTest {
@Autowired
private ExampleService exampleService;
@Test
public void testQueue() throws InterruptedException {
exampleService.queue("Hello!");
Thread.sleep(1000);//Wait for message processing
System.out.println("Ready!");
}
}
我有以下输出:
17:19:10.230 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b received message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.231 [dest1.group1-1] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
17:19:10.231 [dest1.group1-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.232 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
17:19:10.232 [dest1.group1-2] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
Ready!
我不知道我的配置有什么问题,或者如果是某个错误,有什么建议吗?
谢谢!
已编辑:
我上传了一个(非)工作示例here
您可以使用以下方法创建 RabbitMQ 实例:
docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management
从配置来看,我认为,您正在尝试将相同的消息再次发布到相同的目的地 dest_1
。
spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1
从日志中可以清楚地看出,第二条消息具有不同的 ID
id=788e8bbf-4ae4-86cc-0859-d4f153cb5807
id=2f22ce16-bb5a-350c-8b3d-e6c898760888
由于 ExampleConfig 中的配置,我检测到 @Publisher 发布了两次。这个新配置(借自 )似乎工作正常:
@Bean
public static BeanFactoryPostProcessor bfpp() {
return bf -> bf.getBean(IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME,
PublisherAnnotationBeanPostProcessor.class).setProxyTargetClass(true);
}
我 运行我的应用程序处于调试模式 (intellij),因此偏移量没有得到更新。在 运行 模式下尝试 运行ning,它解决了我的问题。
我正在使用 Spring Cloud Stream (Edgware.SR5) 和 Spring Boot (1.5.10.RELEASE)。我的@StreamListener 对收到的每条消息处理两次。
示例的思路是在队列中发布一条消息并对其进行处理。
服务:
@EnableBinding(ExampleBindings.class)
@Service
public class ExampleService {
@Publisher(channel = ExampleBindings.OUTPUT)
public String queue(String message){
return message;
}
@StreamListener(ExampleBindings.INPUT)
public void dequeue(String message){
System.out.println("New message: " + message);
}
}
绑定:
public interface ExampleBindings {
String INPUT = "input1";
String OUTPUT = "output1";
@Input(ExampleBindings.INPUT)
SubscribableChannel input();
@Output(ExampleBindings.OUTPUT)
MessageChannel output();
}
application.properties:
spring.cloud.stream.default.group=group1
spring.cloud.stream.default.binder=binder1
spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1
spring.cloud.stream.binders.binder1.type=rabbit
spring.cloud.stream.binders.binder1.environment.spring.rabbitmq.host=localhost
配置(用于在测试中注入代理服务):
@Configuration
public class ExampleConfig {
@Bean
public PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor(){
PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor =
new PublisherAnnotationBeanPostProcessor();
publisherAnnotationBeanPostProcessor.setProxyTargetClass(true);
return publisherAnnotationBeanPostProcessor;
}
}
测试:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ExampleServiceTest {
@Autowired
private ExampleService exampleService;
@Test
public void testQueue() throws InterruptedException {
exampleService.queue("Hello!");
Thread.sleep(1000);//Wait for message processing
System.out.println("Ready!");
}
}
我有以下输出:
17:19:10.230 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b received message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.231 [dest1.group1-1] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
17:19:10.231 [dest1.group1-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.232 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
17:19:10.232 [dest1.group1-2] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
Ready!
我不知道我的配置有什么问题,或者如果是某个错误,有什么建议吗?
谢谢!
已编辑:
我上传了一个(非)工作示例here
您可以使用以下方法创建 RabbitMQ 实例:
docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management
从配置来看,我认为,您正在尝试将相同的消息再次发布到相同的目的地 dest_1
。
spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1
从日志中可以清楚地看出,第二条消息具有不同的 ID
id=788e8bbf-4ae4-86cc-0859-d4f153cb5807
id=2f22ce16-bb5a-350c-8b3d-e6c898760888
由于 ExampleConfig 中的配置,我检测到 @Publisher 发布了两次。这个新配置(借自
@Bean
public static BeanFactoryPostProcessor bfpp() {
return bf -> bf.getBean(IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME,
PublisherAnnotationBeanPostProcessor.class).setProxyTargetClass(true);
}
我 运行我的应用程序处于调试模式 (intellij),因此偏移量没有得到更新。在 运行 模式下尝试 运行ning,它解决了我的问题。