一种从不同队列消费并生产到不同队列的功能

One function to consume from different queues and produce to different queues

我希望使用相同的函数从不同的 RabbitMQ 队列中消费消息,然后向不同的交换器生成消息。

我处理消费和生产的函数

@Bean
public Function<String, String> myFunction() {

    return (x) -> {
        System.out.println(x);
        return "hello world " + x;
    };
}

我的 application.yml 看起来像这样:

spring:
  cloud:
    stream:
      function:
        definition: myFunction
      rabbit:
        bindings:
          myFunction-in-0:
            consumer:
              declareExchange: false
              bindQueue: false
              queueNameGroupOnly: true
              bindingRoutingKey: '"myExchange.test.request"'
          myFunction-out-0:
            producer:
              declareExchange: false
              bindQueue: false
              queueNameGroupOnly: true
              routingKeyExpression: '"myExchange.test.result"'
          myFunction-in-1:
            consumer:
              declareExchange: false
              bindQueue: false
              queueNameGroupOnly: true
              bindingRoutingKey: '"myExchange1.test.request"'
          myFunction-out-1:
            producer:
              declareExchange: false
              bindQueue: false
              queueNameGroupOnly: true
              routingKeyExpression: '"myExchange1.test.result"'
      bindings:
        myFunction-in-0:
          content-type: application/json
          group: MY_QUEUE
        myFunction-out-0:
          content-type: application/json
          destination: myExchange
        myFunction-in-1:
          content-type: application/json
          group: MY_QUEUE1
        myFunction-out-1:
          content-type: application/json
          destination: myExchange1

基本上,我想要实现的是拥有一个功能,在幕后,我希望 Spring Cloud Stream 为我创建两个消费者和两个生产者,而我根本不必更改功能。

我认为这可以通过将绑定 in out 绑定分别更改为 0 1 来实现,但它不起作用。 Spring 云流只为 in-0 绑定创建一个消费者,我的 in-1 绑定被忽略。

有什么想法吗?

Multi input/output 当前 (3.1.3) 支持反应函数。

IMPORTANT: At the moment, function arity is only supported for reactive functions (Function<TupleN<Flux<?>…​>, TupleN<Flux<?>…​>>) centered on Complex event processing where evaluation and computation on confluence of events typically requires view into a stream of events rather than single event.

然而,即使使用反应函数,定义也有点不同,索引与函数 arity 显式耦合。与您的示例相反,不同的绑定对于每个索引可能具有不同的 input/output 类型。因此,函数定义使用 Project Reactor 中的 TupleN 类型,允许指定每个绑定的类型。

让我们让您的示例生效:

@Bean
public Function<Tuple2<Flux<String>, Flux<String>>, Tuple2<Flux<String>, Flux<String>>> myFunction2() {
    return tuple -> {
        var flux = Flux.merge(tuple.getT1(), tuple.getT2())
                .doOnNext(System.out::println)
                .map(x -> "hello world " + x)
                .publish();
        flux.connect();
        return Tuples.of(flux, flux);
    };
}

如果你真的不能改变函数,你可以创建一个响应式包装函数:

@Bean
public Function<Tuple2<Flux<String>, Flux<String>>, Tuple2<Flux<String>, Flux<String>>> myFunctionWrapper() {
    return tuple -> {
        var flux = Flux.merge(tuple.getT1(), tuple.getT2())
                .map(myFunction())
                .publish();
        flux.connect();
        return Tuples.of(flux, flux);
    };
}

这是 docs 中的相关部分。