N个元素中的样本1
Sample 1 in N elements
我有一个很长的运行 Flux,想在 N 个元素中记录 1 以监控进度。以下代码以 N 毫秒记录一次。
Flux
.fromStream(
IntStream
.range(1, 101)
.mapToObj(Integer::valueOf)
)
.sample(Duration.ofMillis(2))
.subscribe(e -> log.debug(e.toString()));
听起来像 sample(Publisher...) 可用于通过为第 1 个元素生成 Mono.Just("")
和为其余元素生成 Mono.empty()
来实现在 N 个元素中记录 1。但是该方法不提供被采样的元素。请求有关如何解决此问题的想法?
您可以使用 Flux#index
获取索引 Flux 并记录每个第 n 个元素。这可以使用 Flux#transform
.
干净地合并到您的链中
log1InN
实用方法:
<T> Flux<T> log1InN(Flux<T> source, int n) {
return source.index()
.doOnNext(e -> {
if (e.getT1() % n == 0) {
log.info("Element {}: {}", e.getT1(), e.getT2());
}
}
).map(Tuple2::getT2);
}
然后用它来记录例如每第十个元素:
Flux.fromStream(
IntStream
.range(1, 101)
.mapToObj(Integer::valueOf)
)
.transform(f -> log1InN(f, 10))
.subscribe();
我有一个很长的运行 Flux,想在 N 个元素中记录 1 以监控进度。以下代码以 N 毫秒记录一次。
Flux
.fromStream(
IntStream
.range(1, 101)
.mapToObj(Integer::valueOf)
)
.sample(Duration.ofMillis(2))
.subscribe(e -> log.debug(e.toString()));
听起来像 sample(Publisher...) 可用于通过为第 1 个元素生成 Mono.Just("")
和为其余元素生成 Mono.empty()
来实现在 N 个元素中记录 1。但是该方法不提供被采样的元素。请求有关如何解决此问题的想法?
您可以使用 Flux#index
获取索引 Flux 并记录每个第 n 个元素。这可以使用 Flux#transform
.
log1InN
实用方法:
<T> Flux<T> log1InN(Flux<T> source, int n) {
return source.index()
.doOnNext(e -> {
if (e.getT1() % n == 0) {
log.info("Element {}: {}", e.getT1(), e.getT2());
}
}
).map(Tuple2::getT2);
}
然后用它来记录例如每第十个元素:
Flux.fromStream(
IntStream
.range(1, 101)
.mapToObj(Integer::valueOf)
)
.transform(f -> log1InN(f, 10))
.subscribe();