如何对 Flux 中的项目进行计数,如果计数大于 X,则 return 错误,否则继续管道

How to Count Items in a Flux, return error if count is greater than X, else continue with Pipeline

我是 Spring 中的 Project Reactor 新手,我不太确定如何执行某些操作:

我的管道有管道 returns 记录。一切顺利。

但我想计算这些记录然后做一些事情(比如 if else),如果返回的记录 > X 则错误,否则继续。

知道计数returns一个Mono<Long>,然后我就失去了记录,我该怎么办?

我在想:

以某种方式使用 flatMap 并在此平面图中执行某些操作。 不知何故,我发现 Flux 中有一个 reduce 方法可能会有所帮助。

问题是,我不确定如何进行。

不完全确定你想要什么,所以将根据假设提供两个建议

1.. 您想收集所有元素然后评估是否超过 n,如果超过则抛出错误。您可以使用 collectList,对元素进行计数,然后在任何情况下转换回通量。如果总数低于限制,这只会对任何元素 doStuff

    Flux.range(1,10)
            .collectList()
            .flatMap(s -> 
                s.size()>7 
                    ? Mono.error(new RuntimeException("TOO MANY!")) 
                    : Mono.just(s))
            .flatMapMany(Flux::fromIterable)
            .map(this::doStuff)

2.. 您想要动态评估元素的数量,您可以使用外部原子计数器来实现。这将 doStuff 每个元素直到有问题的元素。

    AtomicLong count = new AtomicLong();

    Flux.range(1,10)
            .flatMap(s -> 
                count.incrementAndGet() > 7 
                    ? Flux.error(new RuntimeException("TOO MANY!")) 
                    : Flux.just(s))
             .map(this::doStuff);

我也是 Spring Reactor 和反应式编程的新手,但我自己试了一下,这对我有用,返回了通量元素的 Long 值:

fluxObject.count().block().longValue()

在这种情况下,您也可以使用 shortValue()