不在 Mono.cache() 中缓存错误信号
Not caching error signals in Mono.cache()
你好反应堆伙计们 - 我试图编写一些反应式代码(令人惊讶,嗯?)并且遇到了轻微的障碍。我认为它 可能 是 reactor 的错误,但我想在发布错误之前先在这里问一下。
对于上下文:我有一个缓存 Map<Key, Mono<Value>>
。客户端将请求数据 - 我们检查缓存并使用本质上是 computeIfAbsent
的内容将 Mono
和 .cache()
放入缓存(如果该键尚未缓存任何内容)。客户端然后获取 Mono
和 do magic(此处不相关)。现在,要注意的是缓存的填充可能会遇到暂时性错误,所以我们不想缓存错误——当前请求会出错但是 "next" 客户端在订阅时应该触发整个管道重新运行。
阅读过,例如this closed issue, I settled on Mono#cache(ttlForValue, ttlForError, ttlForEmpty)
。
这就是事情变得有趣的地方。
因为我不想缓存 error
(或 empty
,但忽略它)信号,我发现以下文档很有希望
If the relevant TTL generator throws any Exception
, that exception will be propagated to the Subscriber that encountered the cache miss, but the cache will be immediately cleared, so further Subscribers might re-populate the cache in case the error was transient. In case the source was emitting an error, that error is dropped and added as a suppressed exception. In case the source was emitting a value, that value is dropped.
强调我的
所以我尝试了以下方法(无耻地抄袭了链接 GitHub 问题中的示例)
public class TestBench {
public static void main(String[] args) throws Exception {
var sampleService = new SampleService();
var producer = Mono.fromSupplier(sampleService::call).cache(
__ -> Duration.ofHours(24),
//don't cache errors
e -> {throw Exceptions.propagate(e);},
//meh
() -> {throw new RuntimeException();});
try {
producer.block();
} catch (RuntimeException e) {
System.out.println("Caught exception : " + e);
}
sampleService.serverAvailable = true;
var result = producer.block();
System.out.println(result);
}
static final class SampleService {
volatile boolean serverAvailable = false;
String call() {
System.out.println("Calling service with availability: " + serverAvailable);
if (!serverAvailable) throw new RuntimeException("Error");
return "Success";
}
}
}
输出
09:12:23.991 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Calling service with availability: false
09:12:24.034 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
java.lang.RuntimeException: Error
at uk.co.borismorris.testbench.TestBench$SampleService.call(TestBench.java:40)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56)
at reactor.core.publisher.MonoCacheTime.subscribe(MonoCacheTime.java:123)
at reactor.core.publisher.Mono.block(Mono.java:1474)
at uk.co.borismorris..boris.testbench.TestBench.main(TestBench.java:26)
Caught exception : reactor.core.Exceptions$BubblingException: java.lang.RuntimeException: Error
Exception in thread "main" java.lang.RuntimeException: Error
at uk.co.borismorris.testbench.TestBench$SampleService.call(TestBench.java:40)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56)
at reactor.core.publisher.MonoCacheTime.subscribe(MonoCacheTime.java:123)
at reactor.core.publisher.Mono.block(Mono.java:1474)
at uk.co.borismorris.testbench.TestBench.main(TestBench.java:26)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1475)
at uk.co.borismorris.testbench.TestBench.main(TestBench.java:31)
好吧,这没有用 - 错误被缓存,第二个订阅者只看到同样的错误。
看the code原因很明显
Duration ttl = null;
try {
ttl = main.ttlGenerator.apply(signal);
}
catch (Throwable generatorError) {
signalToPropagate = Signal.error(generatorError);
STATE.set(main, signalToPropagate); //HERE
if (signal.isOnError()) {
//noinspection ThrowableNotThrown
Exceptions.addSuppressed(generatorError, signal.getThrowable());
}
}
STATE
设置为error
信号,根本没有清除。但这还不是全部,
代码没有清除缓存的原因在这个块下面
if (ttl != null) {
main.clock.schedule(main, ttl.toMillis(), TimeUnit.MILLISECONDS);
}
else {
//error during TTL generation, signal != updatedSignal, aka dropped
if (signal.isOnNext()) {
Operators.onNextDropped(signal.get(), currentContext());
}
else if (signal.isOnError()) {
Operators.onErrorDropped(signal.getThrowable(), currentContext());
}
//immediate cache clear
main.run();
}
在这种情况下 ttl == null
因为 ttl
的生成抛出了 Exception
。 signal
是一个 error
,因此进入该分支并调用 Operators.onErrorDropped
public static void onErrorDropped(Throwable e, Context context) {
Consumer<? super Throwable> hook = context.getOrDefault(Hooks.KEY_ON_ERROR_DROPPED,null);
if (hook == null) {
hook = Hooks.onErrorDroppedHook;
}
if (hook == null) {
log.error("Operator called default onErrorDropped", e);
throw Exceptions.bubble(e);
}
hook.accept(e);
}
所以在这里我们可以看到,如果上下文中没有 onError
挂钩并且没有设置默认值,则调用 throw Exceptions.bubble(e)
并且 MonoCacheTime
returns 中的代码早期,未能致电 main.run()
。因此错误保持缓存无限期因为没有TTL!
以下代码修复了该问题
public class TestBench {
private static final Logger logger = LoggerFactory.getLogger(TestBench.class);
private static final Consumer<Throwable> onErrorDropped = e -> logger.error("Dropped", e);
static {
//add default hook
Hooks.onErrorDropped(onErrorDropped);
}
public static void main(String[] args) throws Exception {
var sampleService = new SampleService();
var producer = Mono.fromSupplier(sampleService::call).cache(
__ -> Duration.ofHours(24),
//don't cache errors
e -> {throw Exceptions.propagate(e);},
//meh
() -> {throw new RuntimeException();});
try {
producer.block();
} catch (RuntimeException e) {
System.out.println("Caught exception : " + e);
}
sampleService.serverAvailable = true;
var result = producer.block();
System.out.println(result);
}
static final class SampleService {
volatile boolean serverAvailable = false;
String call() {
System.out.println("Calling service with availability: " + serverAvailable);
if (!serverAvailable) throw new RuntimeException("Error");
return "Success";
}
}
}
但是 这增加了一个全局 Hook,这并不理想。该代码暗示了添加每个管道挂钩的能力,但我不知道该怎么做。以下是有效的,但显然是 hack
.subscriberContext(ctx -> ctx.put("reactor.onErrorDropped.local", onErrorDropped))
问题
- 以上是错误吗,缺少
onErrorDropped
挂钩会导致错误被无限期缓存吗?
- 有没有办法在
subscriberContext
而不是全局设置 onErrorDropped
挂钩?
跟进
来自代码;似乎支持从 TTL 生成器函数返回 null
,并且在立即清除信号时具有相同的行为。如果不是,订阅者看到的是原始错误,而不是来自 TTL 生成器的错误和一个被抑制的错误——这看起来可能更整洁
public static void main(String[] args) throws Exception {
var sampleService = new SampleService();
var producer = Mono.fromSupplier(sampleService::call).cache(
__ -> Duration.ofHours(24),
//don't cache errors
e -> null,
//meh
() -> null);
try {
producer.block();
} catch (RuntimeException e) {
System.out.println("Caught exception : " + e);
}
sampleService.serverAvailable = true;
var result = producer.block();
System.out.println(result);
}
是否支持这种行为?是否应该记录在案?
您确实发现了一个错误!而且我认为文档也可以针对 cache
:
的这种变体进行改进
- 关注它如何处理 TTL 内部的异常
Function
可能具有误导性
- 源中应该有一个"ignoring"一类信号的直接记录方式(你就是这种情况:当源出错时,你希望后续订阅者"retry")。
- 由于使用
onErrorDropped
(默认抛出丢弃的异常,从而防止 main.run()
状态重置),该行为存在错误。
不幸的是,测试使用 StepVerifier#verifyThenAssertThat()
,它设置了一个 onErrorDropped
挂钩,因此最后一个错误从未被识别。
在 TTL 函数中返回 null
并没有更好地工作,因为发生了同样的错误,但这次原始源异常是 dropped/bubbled。
但是有一个理想的语义可以将错误传播给第一个订阅者并让第二个订阅者重试:to return Duration.ZERO
in the ttl Function
。这是未记录的,但现在可以使用:
IllegalStateException exception = new IllegalStateException("boom");
AtomicInteger count = new AtomicInteger();
Mono<Integer> source = Mono.fromCallable(() -> {
int c = count.incrementAndGet();
if (c == 1) throw exception;
return c;
});
Mono<Integer> cache = source.cache(v -> Duration.ofSeconds(10),
e -> Duration.ZERO,
() -> Duration.ofSeconds(10));
assertThat(cache.retry().block()).isEqualTo(2);
我将打开一个问题来修复状态重置错误并将 javadoc 集中在上述解决方案上,同时将处理抛出 TTL 函数的位移动到最后一个单独的较短段落中。
你好反应堆伙计们 - 我试图编写一些反应式代码(令人惊讶,嗯?)并且遇到了轻微的障碍。我认为它 可能 是 reactor 的错误,但我想在发布错误之前先在这里问一下。
对于上下文:我有一个缓存 Map<Key, Mono<Value>>
。客户端将请求数据 - 我们检查缓存并使用本质上是 computeIfAbsent
的内容将 Mono
和 .cache()
放入缓存(如果该键尚未缓存任何内容)。客户端然后获取 Mono
和 do magic(此处不相关)。现在,要注意的是缓存的填充可能会遇到暂时性错误,所以我们不想缓存错误——当前请求会出错但是 "next" 客户端在订阅时应该触发整个管道重新运行。
阅读过,例如this closed issue, I settled on Mono#cache(ttlForValue, ttlForError, ttlForEmpty)
。
这就是事情变得有趣的地方。
因为我不想缓存 error
(或 empty
,但忽略它)信号,我发现以下文档很有希望
If the relevant TTL generator throws any
Exception
, that exception will be propagated to the Subscriber that encountered the cache miss, but the cache will be immediately cleared, so further Subscribers might re-populate the cache in case the error was transient. In case the source was emitting an error, that error is dropped and added as a suppressed exception. In case the source was emitting a value, that value is dropped.
强调我的
所以我尝试了以下方法(无耻地抄袭了链接 GitHub 问题中的示例)
public class TestBench {
public static void main(String[] args) throws Exception {
var sampleService = new SampleService();
var producer = Mono.fromSupplier(sampleService::call).cache(
__ -> Duration.ofHours(24),
//don't cache errors
e -> {throw Exceptions.propagate(e);},
//meh
() -> {throw new RuntimeException();});
try {
producer.block();
} catch (RuntimeException e) {
System.out.println("Caught exception : " + e);
}
sampleService.serverAvailable = true;
var result = producer.block();
System.out.println(result);
}
static final class SampleService {
volatile boolean serverAvailable = false;
String call() {
System.out.println("Calling service with availability: " + serverAvailable);
if (!serverAvailable) throw new RuntimeException("Error");
return "Success";
}
}
}
输出
09:12:23.991 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Calling service with availability: false
09:12:24.034 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
java.lang.RuntimeException: Error
at uk.co.borismorris.testbench.TestBench$SampleService.call(TestBench.java:40)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56)
at reactor.core.publisher.MonoCacheTime.subscribe(MonoCacheTime.java:123)
at reactor.core.publisher.Mono.block(Mono.java:1474)
at uk.co.borismorris..boris.testbench.TestBench.main(TestBench.java:26)
Caught exception : reactor.core.Exceptions$BubblingException: java.lang.RuntimeException: Error
Exception in thread "main" java.lang.RuntimeException: Error
at uk.co.borismorris.testbench.TestBench$SampleService.call(TestBench.java:40)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56)
at reactor.core.publisher.MonoCacheTime.subscribe(MonoCacheTime.java:123)
at reactor.core.publisher.Mono.block(Mono.java:1474)
at uk.co.borismorris.testbench.TestBench.main(TestBench.java:26)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1475)
at uk.co.borismorris.testbench.TestBench.main(TestBench.java:31)
好吧,这没有用 - 错误被缓存,第二个订阅者只看到同样的错误。
看the code原因很明显
Duration ttl = null;
try {
ttl = main.ttlGenerator.apply(signal);
}
catch (Throwable generatorError) {
signalToPropagate = Signal.error(generatorError);
STATE.set(main, signalToPropagate); //HERE
if (signal.isOnError()) {
//noinspection ThrowableNotThrown
Exceptions.addSuppressed(generatorError, signal.getThrowable());
}
}
STATE
设置为error
信号,根本没有清除。但这还不是全部,
代码没有清除缓存的原因在这个块下面
if (ttl != null) {
main.clock.schedule(main, ttl.toMillis(), TimeUnit.MILLISECONDS);
}
else {
//error during TTL generation, signal != updatedSignal, aka dropped
if (signal.isOnNext()) {
Operators.onNextDropped(signal.get(), currentContext());
}
else if (signal.isOnError()) {
Operators.onErrorDropped(signal.getThrowable(), currentContext());
}
//immediate cache clear
main.run();
}
在这种情况下 ttl == null
因为 ttl
的生成抛出了 Exception
。 signal
是一个 error
,因此进入该分支并调用 Operators.onErrorDropped
public static void onErrorDropped(Throwable e, Context context) {
Consumer<? super Throwable> hook = context.getOrDefault(Hooks.KEY_ON_ERROR_DROPPED,null);
if (hook == null) {
hook = Hooks.onErrorDroppedHook;
}
if (hook == null) {
log.error("Operator called default onErrorDropped", e);
throw Exceptions.bubble(e);
}
hook.accept(e);
}
所以在这里我们可以看到,如果上下文中没有 onError
挂钩并且没有设置默认值,则调用 throw Exceptions.bubble(e)
并且 MonoCacheTime
returns 中的代码早期,未能致电 main.run()
。因此错误保持缓存无限期因为没有TTL!
以下代码修复了该问题
public class TestBench {
private static final Logger logger = LoggerFactory.getLogger(TestBench.class);
private static final Consumer<Throwable> onErrorDropped = e -> logger.error("Dropped", e);
static {
//add default hook
Hooks.onErrorDropped(onErrorDropped);
}
public static void main(String[] args) throws Exception {
var sampleService = new SampleService();
var producer = Mono.fromSupplier(sampleService::call).cache(
__ -> Duration.ofHours(24),
//don't cache errors
e -> {throw Exceptions.propagate(e);},
//meh
() -> {throw new RuntimeException();});
try {
producer.block();
} catch (RuntimeException e) {
System.out.println("Caught exception : " + e);
}
sampleService.serverAvailable = true;
var result = producer.block();
System.out.println(result);
}
static final class SampleService {
volatile boolean serverAvailable = false;
String call() {
System.out.println("Calling service with availability: " + serverAvailable);
if (!serverAvailable) throw new RuntimeException("Error");
return "Success";
}
}
}
但是 这增加了一个全局 Hook,这并不理想。该代码暗示了添加每个管道挂钩的能力,但我不知道该怎么做。以下是有效的,但显然是 hack
.subscriberContext(ctx -> ctx.put("reactor.onErrorDropped.local", onErrorDropped))
问题
- 以上是错误吗,缺少
onErrorDropped
挂钩会导致错误被无限期缓存吗? - 有没有办法在
subscriberContext
而不是全局设置onErrorDropped
挂钩?
跟进
来自代码;似乎支持从 TTL 生成器函数返回 null
,并且在立即清除信号时具有相同的行为。如果不是,订阅者看到的是原始错误,而不是来自 TTL 生成器的错误和一个被抑制的错误——这看起来可能更整洁
public static void main(String[] args) throws Exception {
var sampleService = new SampleService();
var producer = Mono.fromSupplier(sampleService::call).cache(
__ -> Duration.ofHours(24),
//don't cache errors
e -> null,
//meh
() -> null);
try {
producer.block();
} catch (RuntimeException e) {
System.out.println("Caught exception : " + e);
}
sampleService.serverAvailable = true;
var result = producer.block();
System.out.println(result);
}
是否支持这种行为?是否应该记录在案?
您确实发现了一个错误!而且我认为文档也可以针对 cache
:
- 关注它如何处理 TTL 内部的异常
Function
可能具有误导性 - 源中应该有一个"ignoring"一类信号的直接记录方式(你就是这种情况:当源出错时,你希望后续订阅者"retry")。
- 由于使用
onErrorDropped
(默认抛出丢弃的异常,从而防止main.run()
状态重置),该行为存在错误。
不幸的是,测试使用 StepVerifier#verifyThenAssertThat()
,它设置了一个 onErrorDropped
挂钩,因此最后一个错误从未被识别。
在 TTL 函数中返回 null
并没有更好地工作,因为发生了同样的错误,但这次原始源异常是 dropped/bubbled。
但是有一个理想的语义可以将错误传播给第一个订阅者并让第二个订阅者重试:to return Duration.ZERO
in the ttl Function
。这是未记录的,但现在可以使用:
IllegalStateException exception = new IllegalStateException("boom");
AtomicInteger count = new AtomicInteger();
Mono<Integer> source = Mono.fromCallable(() -> {
int c = count.incrementAndGet();
if (c == 1) throw exception;
return c;
});
Mono<Integer> cache = source.cache(v -> Duration.ofSeconds(10),
e -> Duration.ZERO,
() -> Duration.ofSeconds(10));
assertThat(cache.retry().block()).isEqualTo(2);
我将打开一个问题来修复状态重置错误并将 javadoc 集中在上述解决方案上,同时将处理抛出 TTL 函数的位移动到最后一个单独的较短段落中。