如何在 Project Reactor 中实现轮询逻辑?
How do you implement Polling Logic in Project Reactor?
我有一个方法可以发送请求以获取作业状态和 returns 状态,它看起来像这样:
Mono<JobStatus> getJobStatus() {...}
JobStatus
对象有一个方法 JobStatus.isDone()
returns 无论挂起的作业是否完成。
有没有办法让我继续订阅单声道直到 JobStatus.isDone()
为真?即类似 getJobStatus().keepSubscribingUntil(status -> status.isDone())
一个选项是 getJobStatus()
Mono
仅在工作完成时发出,这可能不一定容易,具体取决于 Mono
当前的实施方式。
对于投票,假设您每次订阅时都会进行 Mono
投票 您可以将 repeatWhen
与 takeUntil
配对使用:
getJobStatus()
.repeatWhen(completed -> completed.delayElements(Duration.ofMillis(pollDelay))) //(1)
.takeUntil(JobStatus::isDone) //(2)
.last() //(3)
(1) 重复re-subscribe到源Mono
(这会产生一个Flux<JobStatus>
)
(2) 一旦返回状态标记为done就取消上面的重复循环
(3) 切换回 Mono<JobStatus>
发出最后一次迭代的状态(因此标记为完成的状态)
其他选项将使用 repeatWhenEmpty 当您可能想要轮询直到您从发布者那里获得成功或失败。在某些情况下,您不想永远等待获得响应,而是希望超时,您可以使用相同的方法,在您拥有自己的逻辑或库的方法时使操作超时。
AtomicInteger c = new AtomicInteger();
Mono<String> source = Mono.defer(() -> c.getAndIncrement() < 3
? Mono.empty()
: Mono.just("7"));
return source.repeatWhenEmpty(4, Flux::cache);
在上面的示例中,4 指定了您要重试的最大次数。
source.repeatWhenEmpty(exponentialBackOff(Duration.ofMillis(1),
Duration.ofMillis(15),
Duration.ofSeconds(1)));
https://github.com/cloudfoundry/cf-java-client 有 exponentialBackOff 实用程序方法。
我有一个方法可以发送请求以获取作业状态和 returns 状态,它看起来像这样:
Mono<JobStatus> getJobStatus() {...}
JobStatus
对象有一个方法 JobStatus.isDone()
returns 无论挂起的作业是否完成。
有没有办法让我继续订阅单声道直到 JobStatus.isDone()
为真?即类似 getJobStatus().keepSubscribingUntil(status -> status.isDone())
一个选项是 getJobStatus()
Mono
仅在工作完成时发出,这可能不一定容易,具体取决于 Mono
当前的实施方式。
对于投票,假设您每次订阅时都会进行 Mono
投票 您可以将 repeatWhen
与 takeUntil
配对使用:
getJobStatus()
.repeatWhen(completed -> completed.delayElements(Duration.ofMillis(pollDelay))) //(1)
.takeUntil(JobStatus::isDone) //(2)
.last() //(3)
(1) 重复re-subscribe到源Mono
(这会产生一个Flux<JobStatus>
)
(2) 一旦返回状态标记为done就取消上面的重复循环
(3) 切换回 Mono<JobStatus>
发出最后一次迭代的状态(因此标记为完成的状态)
其他选项将使用 repeatWhenEmpty 当您可能想要轮询直到您从发布者那里获得成功或失败。在某些情况下,您不想永远等待获得响应,而是希望超时,您可以使用相同的方法,在您拥有自己的逻辑或库的方法时使操作超时。
AtomicInteger c = new AtomicInteger();
Mono<String> source = Mono.defer(() -> c.getAndIncrement() < 3
? Mono.empty()
: Mono.just("7"));
return source.repeatWhenEmpty(4, Flux::cache);
在上面的示例中,4 指定了您要重试的最大次数。
source.repeatWhenEmpty(exponentialBackOff(Duration.ofMillis(1),
Duration.ofMillis(15),
Duration.ofSeconds(1)));
https://github.com/cloudfoundry/cf-java-client 有 exponentialBackOff 实用程序方法。