N个元素中的样本1

Sample 1 in N elements

我有一个很长的运行 Flux,想在 N 个元素中记录 1 以监控进度。以下代码以 N 毫秒记录一次。

Flux
    .fromStream(
        IntStream
            .range(1, 101)
            .mapToObj(Integer::valueOf)
    )
    .sample(Duration.ofMillis(2))
    .subscribe(e -> log.debug(e.toString()));

听起来像 sample(Publisher...) 可用于通过为第 1 个元素生成 Mono.Just("") 和为其余元素生成 Mono.empty() 来实现在 N 个元素中记录 1。但是该方法不提供被采样的元素。请求有关如何解决此问题的想法?

您可以使用 Flux#index 获取索引 Flux 并记录每个第 n 个元素。这可以使用 Flux#transform.

干净地合并到您的链中

log1InN实用方法:

<T> Flux<T> log1InN(Flux<T> source, int n) {
        return source.index()
                .doOnNext(e -> {
                            if (e.getT1() % n == 0) {
                                log.info("Element {}: {}", e.getT1(), e.getT2());
                            }
                        }
                ).map(Tuple2::getT2);
    }

然后用它来记录例如每第十个元素:

Flux.fromStream(
                IntStream
                        .range(1, 101)
                        .mapToObj(Integer::valueOf)
        )
                .transform(f -> log1InN(f, 10))
                .subscribe();