支持补货慢于 ticks 的下游请求?

Support for downstream requests that replenish slower than ticks?

我的 Spring 5.2 加载应用程序使用 Spring WebClient 来测试 REST API 性能。每个 API 在 "fixed" interval 循环中被调用 n 次:

AtomicLong rec = new AtomicLong();

for (long sendAt = System.nanoTime(), sent = 0; rec.get() < n;) {
    if (sent < n && System.nanoTime() >= sendAt) {
        webClient
            .post()
            .uri(uri)
            .accept(ACCEPT_TYPE)
            .header(SOME_HEADER, someHeaderValue())
            .body(BodyInserters.fromObject(obj)
            .exchange()
            .doOnSuccess(response -> rec.incrementAndGet())
            .subscribe()
            ;

        sendAt += interval;

        sent++;
    }
}

这很好用。但是,以 declarative/reactive 风格编写等价物是在逃避我。天真地折叠循环和条件:

webClient
    .post()
    .uri(uri)
    .accept(ACCEPT_TYPE)
    .header(SOME_HEADER, someHeaderValue())
    .body(BodyInserters.fromObject(obj)
    .exchange()
    .repeatWhen(f -> Flux.interval(Duration.ofNanos(interval)))
    .take(n)
    .blockLast()
    ;

客户端跟不上时失败:

Could not emit tick 73685 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)

那么,proper/reactive 实现命令式循环功能的方法是什么?

来自 David Karnok 的评论;使用他的 RxJava2 扩展:

<dependency>
    <groupId>com.github.akarnokd</groupId>
    <artifactId>rxjava2-extensions</artifactId>
    <version>0.20.8</version>
</dependency>

你可以做到:

webClient
    .post()
    .uri(uri)
    .accept(ACCEPT_TYPE)
    .header(SOME_HEADER, someHeaderValue())
    .body(BodyInserters.fromObject(obj)
    .exchange()
    .repeatWhen(f -> Flowables.intervalBackpressure(interval, TimeUnit.NANOSECONDS))
    .take(n)
    .blockLast()
    ;

当客户端跟不上时不会失败,但 .header(SOME_HEADER, someHeaderValue()) 只被调用一次(与命令式循环不同); someHeaderValue() 必须在每次重复时更新。