一种从不同队列消费并生产到不同队列的功能
One function to consume from different queues and produce to different queues
我希望使用相同的函数从不同的 RabbitMQ 队列中消费消息,然后向不同的交换器生成消息。
我处理消费和生产的函数
@Bean
public Function<String, String> myFunction() {
return (x) -> {
System.out.println(x);
return "hello world " + x;
};
}
我的 application.yml
看起来像这样:
spring:
cloud:
stream:
function:
definition: myFunction
rabbit:
bindings:
myFunction-in-0:
consumer:
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
bindingRoutingKey: '"myExchange.test.request"'
myFunction-out-0:
producer:
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
routingKeyExpression: '"myExchange.test.result"'
myFunction-in-1:
consumer:
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
bindingRoutingKey: '"myExchange1.test.request"'
myFunction-out-1:
producer:
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
routingKeyExpression: '"myExchange1.test.result"'
bindings:
myFunction-in-0:
content-type: application/json
group: MY_QUEUE
myFunction-out-0:
content-type: application/json
destination: myExchange
myFunction-in-1:
content-type: application/json
group: MY_QUEUE1
myFunction-out-1:
content-type: application/json
destination: myExchange1
基本上,我想要实现的是拥有一个功能,在幕后,我希望 Spring Cloud Stream 为我创建两个消费者和两个生产者,而我根本不必更改功能。
我认为这可以通过将绑定 in out 绑定分别更改为 0 1 来实现,但它不起作用。 Spring 云流只为 in-0
绑定创建一个消费者,我的 in-1
绑定被忽略。
有什么想法吗?
Multi input/output 当前 (3.1.3) 仅 支持反应函数。
IMPORTANT: At the moment, function arity is only supported for reactive functions (Function<TupleN<Flux<?>…>, TupleN<Flux<?>…>>)
centered on Complex event processing where evaluation and computation on confluence of events typically requires view into a stream of events rather than single event.
然而,即使使用反应函数,定义也有点不同,索引与函数 arity 显式耦合。与您的示例相反,不同的绑定对于每个索引可能具有不同的 input/output 类型。因此,函数定义使用 Project Reactor 中的 TupleN
类型,允许指定每个绑定的类型。
让我们让您的示例生效:
@Bean
public Function<Tuple2<Flux<String>, Flux<String>>, Tuple2<Flux<String>, Flux<String>>> myFunction2() {
return tuple -> {
var flux = Flux.merge(tuple.getT1(), tuple.getT2())
.doOnNext(System.out::println)
.map(x -> "hello world " + x)
.publish();
flux.connect();
return Tuples.of(flux, flux);
};
}
如果你真的不能改变函数,你可以创建一个响应式包装函数:
@Bean
public Function<Tuple2<Flux<String>, Flux<String>>, Tuple2<Flux<String>, Flux<String>>> myFunctionWrapper() {
return tuple -> {
var flux = Flux.merge(tuple.getT1(), tuple.getT2())
.map(myFunction())
.publish();
flux.connect();
return Tuples.of(flux, flux);
};
}
这是 docs 中的相关部分。
我希望使用相同的函数从不同的 RabbitMQ 队列中消费消息,然后向不同的交换器生成消息。
我处理消费和生产的函数
@Bean
public Function<String, String> myFunction() {
return (x) -> {
System.out.println(x);
return "hello world " + x;
};
}
我的 application.yml
看起来像这样:
spring:
cloud:
stream:
function:
definition: myFunction
rabbit:
bindings:
myFunction-in-0:
consumer:
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
bindingRoutingKey: '"myExchange.test.request"'
myFunction-out-0:
producer:
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
routingKeyExpression: '"myExchange.test.result"'
myFunction-in-1:
consumer:
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
bindingRoutingKey: '"myExchange1.test.request"'
myFunction-out-1:
producer:
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
routingKeyExpression: '"myExchange1.test.result"'
bindings:
myFunction-in-0:
content-type: application/json
group: MY_QUEUE
myFunction-out-0:
content-type: application/json
destination: myExchange
myFunction-in-1:
content-type: application/json
group: MY_QUEUE1
myFunction-out-1:
content-type: application/json
destination: myExchange1
基本上,我想要实现的是拥有一个功能,在幕后,我希望 Spring Cloud Stream 为我创建两个消费者和两个生产者,而我根本不必更改功能。
我认为这可以通过将绑定 in out 绑定分别更改为 0 1 来实现,但它不起作用。 Spring 云流只为 in-0
绑定创建一个消费者,我的 in-1
绑定被忽略。
有什么想法吗?
Multi input/output 当前 (3.1.3) 仅 支持反应函数。
IMPORTANT: At the moment, function arity is only supported for reactive functions
(Function<TupleN<Flux<?>…>, TupleN<Flux<?>…>>)
centered on Complex event processing where evaluation and computation on confluence of events typically requires view into a stream of events rather than single event.
然而,即使使用反应函数,定义也有点不同,索引与函数 arity 显式耦合。与您的示例相反,不同的绑定对于每个索引可能具有不同的 input/output 类型。因此,函数定义使用 Project Reactor 中的 TupleN
类型,允许指定每个绑定的类型。
让我们让您的示例生效:
@Bean
public Function<Tuple2<Flux<String>, Flux<String>>, Tuple2<Flux<String>, Flux<String>>> myFunction2() {
return tuple -> {
var flux = Flux.merge(tuple.getT1(), tuple.getT2())
.doOnNext(System.out::println)
.map(x -> "hello world " + x)
.publish();
flux.connect();
return Tuples.of(flux, flux);
};
}
如果你真的不能改变函数,你可以创建一个响应式包装函数:
@Bean
public Function<Tuple2<Flux<String>, Flux<String>>, Tuple2<Flux<String>, Flux<String>>> myFunctionWrapper() {
return tuple -> {
var flux = Flux.merge(tuple.getT1(), tuple.getT2())
.map(myFunction())
.publish();
flux.connect();
return Tuples.of(flux, flux);
};
}
这是 docs 中的相关部分。