反应堆项目超时
Project Reactor timeout
我正在一个项目反应器车间工作,并且被困在以下任务中:
/**
* TODO 5
* <p>
* For each item call received in colors flux call the {@link #simulateRemoteCall} operation.
* Timeout in case the {@link #simulateRemoteCall} does not return within 400 ms, but retry twice
* If still no response then provide "default" as a return value
*/
我无法解决的问题是 Flux 实际上从未抛出 TimeOutException!我能够在控制台日志中观察到这一点:
16:05:09.759 [main] INFO Part04HandlingErrors - Received red delaying for 300
16:05:09.781 [main] INFO Part04HandlingErrors - Received black delaying for 500
16:05:09.782 [main] INFO Part04HandlingErrors - Received tan delaying for 300
我尝试调整语句的顺序,但似乎并没有改变行为。注意:此外,我尝试了 timeout() 的重载变体,它接受一个应该返回的默认值,如果没有元素被发出的话。
public Flux<String> timeOutWithRetry(Flux<String> colors) {
return colors
.timeout(Duration.ofMillis(400))
//.timeout(Duration.ofMillis(400), Mono.just("default"))
.retry(2)
.flatMap(this::simulateRemoteCall)
.onErrorReturn(TimeoutException.class, "default");
}
有人可以弄清楚为什么没有发生超时吗?我怀疑该机制在某种程度上不是 "bound" 到 flatMap 调用的方法。
为了完整性:辅助方法:
public Mono<String> simulateRemoteCall(String input) {
int delay = input.length() * 100;
return Mono.just(input)
.doOnNext(s -> log.info("Received {} delaying for {} ", s, delay))
.map(i -> "processed " + i)
.delayElement(Duration.of(delay, ChronoUnit.MILLIS));
}
更完整,这是我为了验证功能而给出的测试:
@Test
public void timeOutWithRetry() {
Flux<String> colors = Flux.just("red", "black", "tan");
Flux<String> results = workshop.timeOutWithRetry(colors);
StepVerifier.create(results).expectNext("processed red", "default", "processed tan").verifyComplete();
}
你说得对,是语句的顺序和位置不正确。
既然要retry/timeout/error-handle远程调用,就应该把这些操作符放在远程调用的Mono
上,而不是Flux
.
Timeout on Flux
观察后续元素之间经过的时间。但是,当您使用 flatMap
时,您可以开箱即用,并且元素之间的延迟实际上将为零(假设 colors
Flux
源自内存列表)。所以这个运算符不应该直接放在Flux
上来达到你的目的。
重试 Flux
意味着它会在出现错误时重新订阅源,这取决于源可能会导致重新处理已处理的元素。相反,您只想重试失败的元素,因此它也应该放在 Mono
.
上
public Flux<String> timeOutWithRetry(Flux<String> colors) {
return colors.flatMap(color -> simulateRemoteCall(color).timeout(Duration.ofMillis(400))
.retry(2)
.onErrorReturn("default"));
}
Martin Tarjányi 的回答是正确的,但是你在代码中也问了为什么
return colors
.timeout(Duration.ofMillis(400))
//.timeout(Duration.ofMillis(400), Mono.just("default"))
.retry(2)
.flatMap(this::simulateRemoteCall)
.onErrorReturn(TimeoutException.class, "default");
没有超时。
原因是如果 colors
通量的元素可用,那么调用 .timeout(Duration.ofMillis(400))
没有效果,因为 timeout
仅传播 TimeoutException
如果 没有 项目在给定的 400 毫秒持续时间内发出,但在此示例中并非如此。
因此元素被发射并且 retry(2)
也没有效果。接下来你在发出的元素上调用 simulateRemoteCall
这需要一些时间,但不会 return 错误。您的代码的结果(除了时间差异之外)与您只是在给定的通量上应用贴图相同:
public Flux<String> timeOutWithRetry(Flux<String> colors) {
return colors.map(s -> "processed " + s);
}
如果您想看到 simulateRemoteCall
调用超时,那么您必须在此调用后添加 timeout
方法。
除了使用 flatMap
,您还可以使用 concatMap
。不同之处在于是否应保留顺序,即 default
值是否可能乱序出现。
使用 concatMap
答案如下所示:
public Flux<String> timeOutWithRetry(Flux<String> colors) {
return colors.concatMap(
color -> simulateRemoteCall(color)
.timeout(Duration.ofMillis(400))
.retry(2)
.onErrorReturn("default"));
}
我正在一个项目反应器车间工作,并且被困在以下任务中:
/**
* TODO 5
* <p>
* For each item call received in colors flux call the {@link #simulateRemoteCall} operation.
* Timeout in case the {@link #simulateRemoteCall} does not return within 400 ms, but retry twice
* If still no response then provide "default" as a return value
*/
我无法解决的问题是 Flux 实际上从未抛出 TimeOutException!我能够在控制台日志中观察到这一点:
16:05:09.759 [main] INFO Part04HandlingErrors - Received red delaying for 300
16:05:09.781 [main] INFO Part04HandlingErrors - Received black delaying for 500
16:05:09.782 [main] INFO Part04HandlingErrors - Received tan delaying for 300
我尝试调整语句的顺序,但似乎并没有改变行为。注意:此外,我尝试了 timeout() 的重载变体,它接受一个应该返回的默认值,如果没有元素被发出的话。
public Flux<String> timeOutWithRetry(Flux<String> colors) {
return colors
.timeout(Duration.ofMillis(400))
//.timeout(Duration.ofMillis(400), Mono.just("default"))
.retry(2)
.flatMap(this::simulateRemoteCall)
.onErrorReturn(TimeoutException.class, "default");
}
有人可以弄清楚为什么没有发生超时吗?我怀疑该机制在某种程度上不是 "bound" 到 flatMap 调用的方法。
为了完整性:辅助方法:
public Mono<String> simulateRemoteCall(String input) {
int delay = input.length() * 100;
return Mono.just(input)
.doOnNext(s -> log.info("Received {} delaying for {} ", s, delay))
.map(i -> "processed " + i)
.delayElement(Duration.of(delay, ChronoUnit.MILLIS));
}
更完整,这是我为了验证功能而给出的测试:
@Test
public void timeOutWithRetry() {
Flux<String> colors = Flux.just("red", "black", "tan");
Flux<String> results = workshop.timeOutWithRetry(colors);
StepVerifier.create(results).expectNext("processed red", "default", "processed tan").verifyComplete();
}
你说得对,是语句的顺序和位置不正确。
既然要retry/timeout/error-handle远程调用,就应该把这些操作符放在远程调用的Mono
上,而不是Flux
.
Timeout on Flux
观察后续元素之间经过的时间。但是,当您使用 flatMap
时,您可以开箱即用,并且元素之间的延迟实际上将为零(假设 colors
Flux
源自内存列表)。所以这个运算符不应该直接放在Flux
上来达到你的目的。
重试 Flux
意味着它会在出现错误时重新订阅源,这取决于源可能会导致重新处理已处理的元素。相反,您只想重试失败的元素,因此它也应该放在 Mono
.
public Flux<String> timeOutWithRetry(Flux<String> colors) {
return colors.flatMap(color -> simulateRemoteCall(color).timeout(Duration.ofMillis(400))
.retry(2)
.onErrorReturn("default"));
}
Martin Tarjányi 的回答是正确的,但是你在代码中也问了为什么
return colors
.timeout(Duration.ofMillis(400))
//.timeout(Duration.ofMillis(400), Mono.just("default"))
.retry(2)
.flatMap(this::simulateRemoteCall)
.onErrorReturn(TimeoutException.class, "default");
没有超时。
原因是如果 colors
通量的元素可用,那么调用 .timeout(Duration.ofMillis(400))
没有效果,因为 timeout
仅传播 TimeoutException
如果 没有 项目在给定的 400 毫秒持续时间内发出,但在此示例中并非如此。
因此元素被发射并且 retry(2)
也没有效果。接下来你在发出的元素上调用 simulateRemoteCall
这需要一些时间,但不会 return 错误。您的代码的结果(除了时间差异之外)与您只是在给定的通量上应用贴图相同:
public Flux<String> timeOutWithRetry(Flux<String> colors) {
return colors.map(s -> "processed " + s);
}
如果您想看到 simulateRemoteCall
调用超时,那么您必须在此调用后添加 timeout
方法。
除了使用 flatMap
,您还可以使用 concatMap
。不同之处在于是否应保留顺序,即 default
值是否可能乱序出现。
使用 concatMap
答案如下所示:
public Flux<String> timeOutWithRetry(Flux<String> colors) {
return colors.concatMap(
color -> simulateRemoteCall(color)
.timeout(Duration.ofMillis(400))
.retry(2)
.onErrorReturn("default"));
}