如何在 Project Reactor 中实现轮询逻辑?

How do you implement Polling Logic in Project Reactor?

我有一个方法可以发送请求以获取作业状态和 returns 状态,它看起来像这样:

Mono<JobStatus> getJobStatus() {...}

JobStatus 对象有一个方法 JobStatus.isDone() returns 无论挂起的作业是否完成。

有没有办法让我继续订阅单声道直到 JobStatus.isDone() 为真?即类似 getJobStatus().keepSubscribingUntil(status -> status.isDone())

一个选项是 getJobStatus() Mono 仅在工作完成时发出,这可能不一定容易,具体取决于 Mono 当前的实施方式。

对于投票,假设您每次订阅时都会进行 Mono 投票 您可以将 repeatWhentakeUntil 配对使用:

getJobStatus()
    .repeatWhen(completed -> completed.delayElements(Duration.ofMillis(pollDelay))) //(1)
    .takeUntil(JobStatus::isDone) //(2)
    .last() //(3)

(1) 重复re-subscribe到源Mono(这会产生一个Flux<JobStatus>

(2) 一旦返回状态标记为done就取消上面的重复循环

(3) 切换回 Mono<JobStatus> 发出最后一次迭代的状态(因此标记为完成的状态)

其他选项将使用 repeatWhenEmpty 当您可能想要轮询直到您从发布者那里获得成功或失败。在某些情况下,您不想永远等待获得响应,而是希望超时,您可以使用相同的方法,在您拥有自己的逻辑或库的方法时使操作超时。

AtomicInteger c = new AtomicInteger();
Mono<String> source = Mono.defer(() -> c.getAndIncrement() < 3 
? Mono.empty() 
: Mono.just("7"));
return source.repeatWhenEmpty(4, Flux::cache);

在上面的示例中,4 指定了您要重试的最大次数。

source.repeatWhenEmpty(exponentialBackOff(Duration.ofMillis(1), 
                Duration.ofMillis(15),
                Duration.ofSeconds(1)));

https://github.com/cloudfoundry/cf-java-client 有 exponentialBackOff 实用程序方法。