支持补货慢于 ticks 的下游请求?
Support for downstream requests that replenish slower than ticks?
我的 Spring 5.2 加载应用程序使用 Spring WebClient 来测试 REST API 性能。每个 API 在 "fixed" interval
循环中被调用 n
次:
AtomicLong rec = new AtomicLong();
for (long sendAt = System.nanoTime(), sent = 0; rec.get() < n;) {
if (sent < n && System.nanoTime() >= sendAt) {
webClient
.post()
.uri(uri)
.accept(ACCEPT_TYPE)
.header(SOME_HEADER, someHeaderValue())
.body(BodyInserters.fromObject(obj)
.exchange()
.doOnSuccess(response -> rec.incrementAndGet())
.subscribe()
;
sendAt += interval;
sent++;
}
}
这很好用。但是,以 declarative/reactive 风格编写等价物是在逃避我。天真地折叠循环和条件:
webClient
.post()
.uri(uri)
.accept(ACCEPT_TYPE)
.header(SOME_HEADER, someHeaderValue())
.body(BodyInserters.fromObject(obj)
.exchange()
.repeatWhen(f -> Flux.interval(Duration.ofNanos(interval)))
.take(n)
.blockLast()
;
客户端跟不上时失败:
Could not emit tick 73685 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
那么,proper/reactive 实现命令式循环功能的方法是什么?
来自 David Karnok 的评论;使用他的 RxJava2 扩展:
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>0.20.8</version>
</dependency>
你可以做到:
webClient
.post()
.uri(uri)
.accept(ACCEPT_TYPE)
.header(SOME_HEADER, someHeaderValue())
.body(BodyInserters.fromObject(obj)
.exchange()
.repeatWhen(f -> Flowables.intervalBackpressure(interval, TimeUnit.NANOSECONDS))
.take(n)
.blockLast()
;
当客户端跟不上时不会失败,但 .header(SOME_HEADER, someHeaderValue())
只被调用一次(与命令式循环不同); someHeaderValue()
必须在每次重复时更新。
我的 Spring 5.2 加载应用程序使用 Spring WebClient 来测试 REST API 性能。每个 API 在 "fixed" interval
循环中被调用 n
次:
AtomicLong rec = new AtomicLong();
for (long sendAt = System.nanoTime(), sent = 0; rec.get() < n;) {
if (sent < n && System.nanoTime() >= sendAt) {
webClient
.post()
.uri(uri)
.accept(ACCEPT_TYPE)
.header(SOME_HEADER, someHeaderValue())
.body(BodyInserters.fromObject(obj)
.exchange()
.doOnSuccess(response -> rec.incrementAndGet())
.subscribe()
;
sendAt += interval;
sent++;
}
}
这很好用。但是,以 declarative/reactive 风格编写等价物是在逃避我。天真地折叠循环和条件:
webClient
.post()
.uri(uri)
.accept(ACCEPT_TYPE)
.header(SOME_HEADER, someHeaderValue())
.body(BodyInserters.fromObject(obj)
.exchange()
.repeatWhen(f -> Flux.interval(Duration.ofNanos(interval)))
.take(n)
.blockLast()
;
客户端跟不上时失败:
Could not emit tick 73685 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
那么,proper/reactive 实现命令式循环功能的方法是什么?
来自 David Karnok 的评论;使用他的 RxJava2 扩展:
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>0.20.8</version>
</dependency>
你可以做到:
webClient
.post()
.uri(uri)
.accept(ACCEPT_TYPE)
.header(SOME_HEADER, someHeaderValue())
.body(BodyInserters.fromObject(obj)
.exchange()
.repeatWhen(f -> Flowables.intervalBackpressure(interval, TimeUnit.NANOSECONDS))
.take(n)
.blockLast()
;
当客户端跟不上时不会失败,但 .header(SOME_HEADER, someHeaderValue())
只被调用一次(与命令式循环不同); someHeaderValue()
必须在每次重复时更新。