在 Reactor Flux 中验证流的开始
Validate beginning of stream in Reactor Flux
使用 Reactor,我正在尝试验证冷 Flux 流的开始,然后成为直通。
例如,假设我需要验证前 N 个元素。如果(且仅当)它通过,则转发这些和更多元素。如果失败,只会发出错误。
这就是我目前所拥有的。它有效,但是有没有更好或更正确的方法来做到这一点?我很想实现自己的运算符,但有人告诉我这很复杂,不推荐。
flux
.bufferUntil(new Predicate<>() {
private int count = 0;
@Override
public boolean test(T next) {
return ++count >= N;
}
})
// Zip with index to know the first element
.zipWith(Flux.<Integer, Integer>generate(() -> 0, (cur, s) -> {
s.next(cur);
return cur + 1;
}))
.map(t -> {
if (t.getT2() == 0 && !validate(t.getT1()))
throw new RuntimeException("Invalid");
return t.getT1();
})
// Flatten buffered elements
.flatMapIterable(identity())
我本可以使用 doOnNext
而不是第二个 map
,因为它不映射任何内容,但我不确定它是否可以接受使用 peek 方法。
我也可以在第二个 map
到 运行 中只使用一次有状态映射器,而不是用索引压缩,我想这是可以接受的,因为我已经在使用有状态谓词了.. .
您的要求听起来很有趣!我们有 switchOnFirst
这可能对验证第一个元素很有用。但是如果你有 N 个元素要验证,我们可以尝试这样的事情。
这里我假设我必须验证前 5 个元素应该 <= 5。然后它是一个有效的流。否则我们会简单地抛出错误,说验证失败。
Flux<Integer> integerFlux = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));
integerFlux
.buffer(5)
.switchOnFirst((signal, flux) -> {
//first 5 elements are <= 5, then it is a valid stream
return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
})
.flatMapIterable(Function.identity())
.subscribe(System.out::println,
System.out::println);
然而,这种方法并不好,因为它每次都会收集 5 个元素,即使在我们可能不希望的第一次验证完成后也是如此。
为了避免在验证后缓冲 N 个元素,我们可以使用 bufferUntil
。一旦我们收集了前 N 个元素并进行了验证,它就会在接收到时将第 1 个元素传递给下游。
AtomicInteger atomicInteger = new AtomicInteger(1);
integerFlux
.bufferUntil(i -> {
if(atomicInteger.get() < 5){
atomicInteger.incrementAndGet();
return false;
}
return true;
})
.switchOnFirst((signal, flux) -> {
return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
})
.flatMapIterable(Function.identity())
.subscribe(System.out::println,
System.out::println);
使用 Reactor,我正在尝试验证冷 Flux 流的开始,然后成为直通。
例如,假设我需要验证前 N 个元素。如果(且仅当)它通过,则转发这些和更多元素。如果失败,只会发出错误。
这就是我目前所拥有的。它有效,但是有没有更好或更正确的方法来做到这一点?我很想实现自己的运算符,但有人告诉我这很复杂,不推荐。
flux
.bufferUntil(new Predicate<>() {
private int count = 0;
@Override
public boolean test(T next) {
return ++count >= N;
}
})
// Zip with index to know the first element
.zipWith(Flux.<Integer, Integer>generate(() -> 0, (cur, s) -> {
s.next(cur);
return cur + 1;
}))
.map(t -> {
if (t.getT2() == 0 && !validate(t.getT1()))
throw new RuntimeException("Invalid");
return t.getT1();
})
// Flatten buffered elements
.flatMapIterable(identity())
我本可以使用 doOnNext
而不是第二个 map
,因为它不映射任何内容,但我不确定它是否可以接受使用 peek 方法。
我也可以在第二个 map
到 运行 中只使用一次有状态映射器,而不是用索引压缩,我想这是可以接受的,因为我已经在使用有状态谓词了.. .
您的要求听起来很有趣!我们有 switchOnFirst
这可能对验证第一个元素很有用。但是如果你有 N 个元素要验证,我们可以尝试这样的事情。
这里我假设我必须验证前 5 个元素应该 <= 5。然后它是一个有效的流。否则我们会简单地抛出错误,说验证失败。
Flux<Integer> integerFlux = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));
integerFlux
.buffer(5)
.switchOnFirst((signal, flux) -> {
//first 5 elements are <= 5, then it is a valid stream
return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
})
.flatMapIterable(Function.identity())
.subscribe(System.out::println,
System.out::println);
然而,这种方法并不好,因为它每次都会收集 5 个元素,即使在我们可能不希望的第一次验证完成后也是如此。
为了避免在验证后缓冲 N 个元素,我们可以使用 bufferUntil
。一旦我们收集了前 N 个元素并进行了验证,它就会在接收到时将第 1 个元素传递给下游。
AtomicInteger atomicInteger = new AtomicInteger(1);
integerFlux
.bufferUntil(i -> {
if(atomicInteger.get() < 5){
atomicInteger.incrementAndGet();
return false;
}
return true;
})
.switchOnFirst((signal, flux) -> {
return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
})
.flatMapIterable(Function.identity())
.subscribe(System.out::println,
System.out::println);