将自定义 Headers 添加到 Spring Cloud Stream(使用 Spring Reactor)

Add Custom Headers to Spring Cloud Stream (with Spring Reactor)

作为 Spring Reactor 的新手,我正在尝试使用 Spring 云流(使用 rabbitMQ)来传输数据。 在将消息发送到 queue 之前,我需要添加一些自定义 headers。

我的spring-cloud-stream的配置是:

spring:
  cloud:
    stream:
      default:
        producer:
          errorChannelEnabled: true
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

      binders:
        rabbitInput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5672
                host: localhost

        rabbitOutput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5670
                host: localhost 

制作人参考:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {

    public static void main(String[] args) {
        SpringApplication.run(MessageProcessor.class, args);
    }

    @Bean
    Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
        return data -> data.map(d -> match(d, students));

    }
    private String match(String message, List<String> students){
        return Objects.isNull(message) || message.isBlank()
            ? message
            : String.valueOf(matchStudentName(message, students));
    }

    private Optional<String> matchStudentName(String message, List<String> students){
        return students.stream()
        .filter(name -> name.equals(message)).findFirst();
    }
    @Bean
    Function<Flux<String>, Flux<Message<String>>> addHeaders() {
        return data-> data.map(d-> MessageBuilder
            .withPayload( d )
            .setHeader("a", 1)
            .setHeader("b", "999")
            .build());
    }
}

Headers 已成功添加到消息中,但它在某处被覆盖并且未传播给消费者。

有人可以分享他们对我们如何使用 Spring Cloud Stream 向消息添加自定义 headers 的想法。

提前致谢!

请升级到 Hoxton.SR2,它将带来 spring-cloud-stream 3.0.2.RELEASE。有一些更新,但简而言之,您正在生成的消息和其中的 header 应该被保留。

旁注: 此外,由于增加了对多个 in/out 函数参数的支持,我们不得不更新函数的绑定名称约定。您可以阅读更多关于它的信息 here,但它对您来说意味着您的配置需要快速更新,因为默认情况下不再使用 inputoutput,因此您应该使用派生的名称来自函数名

spring:
  cloud:
    stream:
      bindings:
        processMessageaddHeaders-in-0:
          binder: rabbitInput
          destination: inputDestination
        processMessageaddHeaders-out-0:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

。 . .或者您可以将派生的绑定名称映射到更具描述性的名称(例如,inputoutput 等)并改用该名称

spring:
  cloud:
    stream:
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders
        bindings: 
          processMessageaddHeaders-in-0: input  
          processMessageaddHeaders-out-0: output