在 Reactor Flux 中验证流的开始

Validate beginning of stream in Reactor Flux

使用 Reactor,我正在尝试验证冷 Flux 流的开始,然后成为直通。

例如,假设我需要验证前 N 个元素。如果(且仅当)它通过,则转发这些和更多元素。如果失败,只会发出错误。

这就是我目前所拥有的。它有效,但是有没有更好或更正确的方法来做到这一点?我很想实现自己的运算符,但有人告诉我这很复杂,不推荐。

flux
.bufferUntil(new Predicate<>() {
    private int count = 0;

    @Override
    public boolean test(T next) {
        return ++count >= N;
    }
})
// Zip with index to know the first element
.zipWith(Flux.<Integer, Integer>generate(() -> 0, (cur, s) -> {
    s.next(cur);
    return cur + 1;
}))
.map(t -> {
    if (t.getT2() == 0 && !validate(t.getT1()))
        throw new RuntimeException("Invalid");
    return t.getT1();
})
// Flatten buffered elements
.flatMapIterable(identity())

我本可以使用 doOnNext 而不是第二个 map,因为它不映射任何内容,但我不确定它是否可以接受使用 peek 方法。

我也可以在第二个 map 到 运行 中只使用一次有状态映射器,而不是用索引压缩,我想这是可以接受的,因为我已经在使用有状态谓词了.. .

您的要求听起来很有趣!我们有 switchOnFirst 这可能对验证第一个元素很有用。但是如果你有 N 个元素要验证,我们可以尝试这样的事情。

这里我假设我必须验证前 5 个元素应该 <= 5。然后它是一个有效的流。否则我们会简单地抛出错误,说验证失败。

Flux<Integer> integerFlux = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));

integerFlux
        .buffer(5)
        .switchOnFirst((signal, flux) -> {
            //first 5 elements are <= 5, then it is a valid stream
            return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
        })
        .flatMapIterable(Function.identity())
        .subscribe(System.out::println,
                System.out::println);   

然而,这种方法并不好,因为它每次都会收集 5 个元素,即使在我们可能不希望的第一次验证完成后也是如此。

为了避免在验证后缓冲 N 个元素,我们可以使用 bufferUntil。一旦我们收集了前 N 个元素并进行了验证,它就会在接收到时将第 1 个元素传递给下游。

AtomicInteger atomicInteger = new AtomicInteger(1);
integerFlux
        .bufferUntil(i -> {
            if(atomicInteger.get() < 5){
                atomicInteger.incrementAndGet();
                return false;
            }
            return true;
        })
        .switchOnFirst((signal, flux) -> {
            return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
        })
        .flatMapIterable(Function.identity())
        .subscribe(System.out::println,
                   System.out::println);