将自定义 Headers 添加到 Spring Cloud Stream(使用 Spring Reactor)
Add Custom Headers to Spring Cloud Stream (with Spring Reactor)
作为 Spring Reactor 的新手,我正在尝试使用 Spring 云流(使用 rabbitMQ)来传输数据。
在将消息发送到 queue 之前,我需要添加一些自定义 headers。
我的spring-cloud-stream的配置是:
spring:
cloud:
stream:
default:
producer:
errorChannelEnabled: true
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
binders:
rabbitInput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5672
host: localhost
rabbitOutput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5670
host: localhost
制作人参考:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {
public static void main(String[] args) {
SpringApplication.run(MessageProcessor.class, args);
}
@Bean
Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
return data -> data.map(d -> match(d, students));
}
private String match(String message, List<String> students){
return Objects.isNull(message) || message.isBlank()
? message
: String.valueOf(matchStudentName(message, students));
}
private Optional<String> matchStudentName(String message, List<String> students){
return students.stream()
.filter(name -> name.equals(message)).findFirst();
}
@Bean
Function<Flux<String>, Flux<Message<String>>> addHeaders() {
return data-> data.map(d-> MessageBuilder
.withPayload( d )
.setHeader("a", 1)
.setHeader("b", "999")
.build());
}
}
Headers 已成功添加到消息中,但它在某处被覆盖并且未传播给消费者。
有人可以分享他们对我们如何使用 Spring Cloud Stream 向消息添加自定义 headers 的想法。
提前致谢!
请升级到 Hoxton.SR2,它将带来 spring-cloud-stream 3.0.2.RELEASE。有一些更新,但简而言之,您正在生成的消息和其中的 header 应该被保留。
旁注:
此外,由于增加了对多个 in/out 函数参数的支持,我们不得不更新函数的绑定名称约定。您可以阅读更多关于它的信息 here,但它对您来说意味着您的配置需要快速更新,因为默认情况下不再使用 input
和 output
,因此您应该使用派生的名称来自函数名
spring:
cloud:
stream:
bindings:
processMessageaddHeaders-in-0:
binder: rabbitInput
destination: inputDestination
processMessageaddHeaders-out-0:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
。 . .或者您可以将派生的绑定名称映射到更具描述性的名称(例如,input
、output
等)并改用该名称
spring:
cloud:
stream:
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
bindings:
processMessageaddHeaders-in-0: input
processMessageaddHeaders-out-0: output
作为 Spring Reactor 的新手,我正在尝试使用 Spring 云流(使用 rabbitMQ)来传输数据。 在将消息发送到 queue 之前,我需要添加一些自定义 headers。
我的spring-cloud-stream的配置是:
spring:
cloud:
stream:
default:
producer:
errorChannelEnabled: true
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
binders:
rabbitInput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5672
host: localhost
rabbitOutput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5670
host: localhost
制作人参考:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {
public static void main(String[] args) {
SpringApplication.run(MessageProcessor.class, args);
}
@Bean
Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
return data -> data.map(d -> match(d, students));
}
private String match(String message, List<String> students){
return Objects.isNull(message) || message.isBlank()
? message
: String.valueOf(matchStudentName(message, students));
}
private Optional<String> matchStudentName(String message, List<String> students){
return students.stream()
.filter(name -> name.equals(message)).findFirst();
}
@Bean
Function<Flux<String>, Flux<Message<String>>> addHeaders() {
return data-> data.map(d-> MessageBuilder
.withPayload( d )
.setHeader("a", 1)
.setHeader("b", "999")
.build());
}
}
Headers 已成功添加到消息中,但它在某处被覆盖并且未传播给消费者。
有人可以分享他们对我们如何使用 Spring Cloud Stream 向消息添加自定义 headers 的想法。
提前致谢!
请升级到 Hoxton.SR2,它将带来 spring-cloud-stream 3.0.2.RELEASE。有一些更新,但简而言之,您正在生成的消息和其中的 header 应该被保留。
旁注:
此外,由于增加了对多个 in/out 函数参数的支持,我们不得不更新函数的绑定名称约定。您可以阅读更多关于它的信息 here,但它对您来说意味着您的配置需要快速更新,因为默认情况下不再使用 input
和 output
,因此您应该使用派生的名称来自函数名
spring:
cloud:
stream:
bindings:
processMessageaddHeaders-in-0:
binder: rabbitInput
destination: inputDestination
processMessageaddHeaders-out-0:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
。 . .或者您可以将派生的绑定名称映射到更具描述性的名称(例如,input
、output
等)并改用该名称
spring:
cloud:
stream:
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
bindings:
processMessageaddHeaders-in-0: input
processMessageaddHeaders-out-0: output