无法过滤使用 Spring Cloud Stream @StreamListener 注释中的条件属性接收的消息
Not able to to filter messages received using condition attribute in Spring Cloud Stream @StreamListener annotation
我正在尝试创建一个基于事件的系统,用于在使用 Apache Kafka 作为消息系统和 Spring Cloud Stream Kafka 的服务之间进行通信。
我已经编写了我的 Receiver class 方法如下,
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
public void handleEmployeeCreatedEvent(@Payload String payload) {
logger.info("Received EmployeeCreatedEvent: " + payload);
}
该方法专门用于捕获与EmployeeCreatedEvent相关的消息或事件。
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
public void handleEmployeeTransferredEvent(@Payload String payload) {
logger.info("Received EmployeeTransferredEvent: " + payload);
}
该方法专门用于捕获与EmployeeTransferredEvent相关的消息或事件。
@StreamListener(target = Sink.INPUT)
public void handleDefaultEvent(@Payload String payload) {
logger.info("Received payload: " + payload);
}
这是默认方法。
当我 运行 应用程序时,我无法看到使用条件属性注释的方法被调用。我只看到 handleDefaultEvent 方法被调用。
我正在使用下面的 CustomMessageSource class 从 Sending/Source 应用程序向此接收器应用程序发送消息,如下所示,
@Component
@EnableBinding(Source.class)
public class CustomMessageSource {
@Autowired
private Source source;
public void sendMessage(String payload,String eventType) {
Message<String> myMessage = MessageBuilder.withPayload(payload)
.setHeader("eventType", eventType)
.build();
source.output().send(myMessage);
}
}
我在 Source App 中从我的控制器调用方法,如下所示,
customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");
customMessageSource 实例自动装配如下,
@Autowired
CustomMessageSource customMessageSource;
基本上,我想过滤 Sink/Receiver 应用程序收到的消息并相应地处理它们。
为此,我使用带有条件属性的@StreamListener 注释来模拟处理不同事件的行为。
我正在使用 Spring Cloud Stream Chelsea.SR2 版本。
谁能帮我解决这个问题。
似乎 headers 没有传播。确保在 spring.cloud.stream.kafka.binder.headers
http://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/Chelsea.SR2/reference/htmlsingle/#_kafka_binder_properties 中包含自定义 headers 。
我正在尝试创建一个基于事件的系统,用于在使用 Apache Kafka 作为消息系统和 Spring Cloud Stream Kafka 的服务之间进行通信。
我已经编写了我的 Receiver class 方法如下,
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
public void handleEmployeeCreatedEvent(@Payload String payload) {
logger.info("Received EmployeeCreatedEvent: " + payload);
}
该方法专门用于捕获与EmployeeCreatedEvent相关的消息或事件。
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
public void handleEmployeeTransferredEvent(@Payload String payload) {
logger.info("Received EmployeeTransferredEvent: " + payload);
}
该方法专门用于捕获与EmployeeTransferredEvent相关的消息或事件。
@StreamListener(target = Sink.INPUT)
public void handleDefaultEvent(@Payload String payload) {
logger.info("Received payload: " + payload);
}
这是默认方法。
当我 运行 应用程序时,我无法看到使用条件属性注释的方法被调用。我只看到 handleDefaultEvent 方法被调用。
我正在使用下面的 CustomMessageSource class 从 Sending/Source 应用程序向此接收器应用程序发送消息,如下所示,
@Component
@EnableBinding(Source.class)
public class CustomMessageSource {
@Autowired
private Source source;
public void sendMessage(String payload,String eventType) {
Message<String> myMessage = MessageBuilder.withPayload(payload)
.setHeader("eventType", eventType)
.build();
source.output().send(myMessage);
}
}
我在 Source App 中从我的控制器调用方法,如下所示,
customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");
customMessageSource 实例自动装配如下,
@Autowired
CustomMessageSource customMessageSource;
基本上,我想过滤 Sink/Receiver 应用程序收到的消息并相应地处理它们。
为此,我使用带有条件属性的@StreamListener 注释来模拟处理不同事件的行为。
我正在使用 Spring Cloud Stream Chelsea.SR2 版本。
谁能帮我解决这个问题。
似乎 headers 没有传播。确保在 spring.cloud.stream.kafka.binder.headers
http://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/Chelsea.SR2/reference/htmlsingle/#_kafka_binder_properties 中包含自定义 headers 。