"Operator called default onErrorDropped" 单声道超时
"Operator called default onErrorDropped" on Mono timeout
在我的生产代码中,当 Mono 超时时,我的日志中出现错误。
我已设法使用以下代码重现这些错误:
@Test
public void testScheduler() {
Mono<String> callableMethod1 = callableMethod();
callableMethod1.block();
Mono<String> callableMethod2 = callableMethod();
callableMethod2.block();
}
private Mono<String> callableMethod() {
return Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"));
}
在 Mono.fromCallable
中,我正在使用第三方库进行阻塞调用。当此调用超时时,我会收到类似于
的错误
reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.publisher.Operators - Scheduler worker in group main failed with an uncaught exception
这些错误似乎也是间歇性的,有时当我 运行 提供的代码时我完全没有错误。但是,当我以 10 次为单位重复呼叫时,我始终都能收到它们。
问题:为什么会出现这个错误?
回答:
当给定给 timeout() 运算符的持续时间已经过去时,它会抛出一个 TimeoutException。结果如下:
一个onError信号被发送到主反应链。结果,主执行恢复并且进程继续(即执行 onErrorResume())。
在结果 #1 之后不久,fromCallable() 中定义的异步任务被中断,这会触发第二个异常(InterruptedException).主反应链无法再处理这个 InterruptedException 因为 TimeoutException 首先发生并且已经导致主反应链恢复(注意: 这种不生成第二个 onError 信号的行为符合 Reactive Stream Specification -> Publisher #7).
由于主链无法正常处理第二个异常(InterruptedException),Reactor 将其记录在错误级别,让我们知道发生了意外异常。
问题:如何摆脱它们?
简答:使用Hooks.onErrorDropped()更改日志级别:
Logger logger = Logger.getLogger(this.getClass().getName());
@Test
public void test() {
Hooks.onErrorDropped(error -> {
logger.log(Level.WARNING, "Exception happened:", error);
});
Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
长答案:如果您的用例允许,您可以处理 fromCallable() 中发生的异常,以便唯一的异常影响主链的是 TimeoutException。在那种情况下,onErrorDropped() 一开始就不会发生。
@Test
public void test() {
Mono.fromCallable(() -> {
try {
Thread.sleep(60);
} catch (InterruptedException ex) {
//release resources, rollback actions, etc
logger.log(Level.WARNING, "Something went wrong...", ex);
}
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
额外参考资料:
在我的生产代码中,当 Mono 超时时,我的日志中出现错误。
我已设法使用以下代码重现这些错误:
@Test
public void testScheduler() {
Mono<String> callableMethod1 = callableMethod();
callableMethod1.block();
Mono<String> callableMethod2 = callableMethod();
callableMethod2.block();
}
private Mono<String> callableMethod() {
return Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"));
}
在 Mono.fromCallable
中,我正在使用第三方库进行阻塞调用。当此调用超时时,我会收到类似于
reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.publisher.Operators - Scheduler worker in group main failed with an uncaught exception
这些错误似乎也是间歇性的,有时当我 运行 提供的代码时我完全没有错误。但是,当我以 10 次为单位重复呼叫时,我始终都能收到它们。
问题:为什么会出现这个错误?
回答:
当给定给 timeout() 运算符的持续时间已经过去时,它会抛出一个 TimeoutException。结果如下:
一个onError信号被发送到主反应链。结果,主执行恢复并且进程继续(即执行 onErrorResume())。
在结果 #1 之后不久,fromCallable() 中定义的异步任务被中断,这会触发第二个异常(InterruptedException).主反应链无法再处理这个 InterruptedException 因为 TimeoutException 首先发生并且已经导致主反应链恢复(注意: 这种不生成第二个 onError 信号的行为符合 Reactive Stream Specification -> Publisher #7).
由于主链无法正常处理第二个异常(InterruptedException),Reactor 将其记录在错误级别,让我们知道发生了意外异常。
问题:如何摆脱它们?
简答:使用Hooks.onErrorDropped()更改日志级别:
Logger logger = Logger.getLogger(this.getClass().getName());
@Test
public void test() {
Hooks.onErrorDropped(error -> {
logger.log(Level.WARNING, "Exception happened:", error);
});
Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
长答案:如果您的用例允许,您可以处理 fromCallable() 中发生的异常,以便唯一的异常影响主链的是 TimeoutException。在那种情况下,onErrorDropped() 一开始就不会发生。
@Test
public void test() {
Mono.fromCallable(() -> {
try {
Thread.sleep(60);
} catch (InterruptedException ex) {
//release resources, rollback actions, etc
logger.log(Level.WARNING, "Something went wrong...", ex);
}
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
额外参考资料: