无法过滤使用 Spring Cloud Stream @StreamListener 注释中的条件属性接收的消息

Not able to to filter messages received using condition attribute in Spring Cloud Stream @StreamListener annotation

我正在尝试创建一个基于事件的系统,用于在使用 Apache Kafka 作为消息系统和 Spring Cloud Stream Kafka 的服务之间进行通信。

我已经编写了我的 Receiver class 方法如下,

    @StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
    public void handleEmployeeCreatedEvent(@Payload String payload) {
        logger.info("Received EmployeeCreatedEvent: " + payload);
    }

该方法专门用于捕获与EmployeeCreatedEvent相关的消息或事件。

    @StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
    public void handleEmployeeTransferredEvent(@Payload String payload) {
        logger.info("Received EmployeeTransferredEvent: " + payload);
    }

该方法专门用于捕获与EmployeeTransferredEvent相关的消息或事件。

    @StreamListener(target = Sink.INPUT)
    public void handleDefaultEvent(@Payload String payload) {
        logger.info("Received payload: " + payload);
    }

这是默认方法。

当我 运行 应用程序时,我无法看到使用条件属性注释的方法被调用。我只看到 handleDefaultEvent 方法被调用。

我正在使用下面的 CustomMessageSource class 从 Sending/Source 应用程序向此接收器应用程序发送消息,如下所示,

@Component
@EnableBinding(Source.class)
public class CustomMessageSource {
    @Autowired
    private Source source;                


    public void  sendMessage(String payload,String eventType) {
        Message<String> myMessage = MessageBuilder.withPayload(payload)
                .setHeader("eventType", eventType)
                .build();
        source.output().send(myMessage);

     }

}

我在 Source App 中从我的控制器调用方法,如下所示,

customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");

customMessageSource 实例自动装配如下,

@Autowired
CustomMessageSource customMessageSource;

基本上,我想过滤 Sink/Receiver 应用程序收到的消息并相应地处理它们。

为此,我使用带有条件属性的@StreamListener 注释来模拟处理不同事件的行为。

我正在使用 Spring Cloud Stream Chelsea.SR2 版本。

谁能帮我解决这个问题。

似乎 headers 没有传播。确保在 spring.cloud.stream.kafka.binder.headers http://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/Chelsea.SR2/reference/htmlsingle/#_kafka_binder_properties 中包含自定义 headers 。