Concat Flux 及其计数(或另一个 reduce 函数)

Concat Flux with its count (or another reduce function)

是否有可能继续(连接)带有一些减少功能的 Flux,例如传递元素的数量?

也就是比如转一个

Flux.fromStream(Stream.of("a", "b", "c")) //note source flux could be read only once 

到将在

中评估的 Flux
"a", "b", "c", "3"

由于您的通量只能读取一次,因此我不得不采取以下解决方法:

我这样定义我的映射器:

public class MyMapper implements Function<String, String> {

  private AtomicInteger atomicInteger;

  public MyMapper(AtomicInteger atomicInteger) {
    this.atomicInteger = atomicInteger;
  }

  @Override
  public String apply(String s) {
    atomicInteger.incrementAndGet();
    return s;
  }

}


最后主要代码如下所示:

  public static void main(String[] args) {
    AtomicInteger count = new AtomicInteger();
    MyMapper mapper = new MyMapper(count);
    Flux<String> f = Flux.fromStream(Stream.of("a", "b", "c"));
    Flux<String> output = f.map(mapper).concatWith(Mono.fromCallable(() -> Integer.toString(count.get())));
    output.doOnNext(System.out::println).blockLast();

  }

打印出来:

a
b
c
3

您的问题不需要“减少”功能。 您可以使用以下方法简单地实现此目的:

  public Flux<String> concat(Flux<String> sourceFlux){
    Mono<String> m = sourceFlux.collectList().map(List::size).map(count -> Integer.toString(count));
    return sourceFlux.concatWith(m);
  }

例如:

public static void main(String[] args) {
  Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));

  Flux<String> fork = source.publish().autoConnect(2);
  ConnectableFlux<Long> counter = fork.count().flux().replay(1);
  counter.connect();

  Flux<String> result = fork.concatWith(counter.map(Object::toString));

  result.subscribe(System.out::println);
}

备注:

  1. .publish()用于向多个订阅者广播相同的序列。 autoConnect(2) 防止提前订阅和使用序列。预计正好有 2 个订阅者。
  2. 众所周知,Flux.concat() 延迟订阅:第一个序列完成后会创建第二个订阅,但在您的情况下,您需要同时为同一源序列设置两个活动订阅者。这可以通过创建 ConnectableFlux 并显式调用其 connect() 方法来解决。 (没有 ConnectableMono class,因此计数器从 Mono 转换为 Flux)。该值是提前获取并缓存的,Flux.concat()次订阅后会重放。

关于 Flux.concat() 的问题及其懒惰:我刚刚注意到 Flux.mergeSequential() 被记录为热切订阅的 concat 的替代品。

因此我的代码可以通过使用 mergeSequential() 而不是 concat() 来简化:

public static void main(String[] args) {
  Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));

  Flux<String> fork = source.publish().autoConnect(2);
  Mono<Long> counter = fork.count();

  Flux<String> result = Flux.mergeSequential(fork, counter.map(Object::toString));

  result.subscribe(System.out::println);
}