Project Reactor 中触发错误的 onErrorContinue 值为 null
Project Reactor in onErrorContinue value that triggered error is null
我在使用项目反应器编写的代码方面遇到了一些问题:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.12.RELEASE</version>
</dependency>
请考虑以下代码:
class Scratch {
public static void main(String[] args) {
ArrayBlockingQueue<Long> q = new ArrayBlockingQueue<>(10);
startProducer(q);
Flux.<Long> create(sink -> consumeItemsFromQueue(q, sink))
.doOnNext(ctx -> System.out.println("Processing " + ctx))
.flatMap(ctx -> Flux.push((sink)->{ throw new IllegalArgumentException("bum!");}))
.onErrorContinue((ex, obj)->{
System.err.println("Caught error "+ex.getMessage() +" in obj:" +obj);
})
.doOnNext(element -> System.out.println("Do On NExt: " + element))
.subscribe();
}
private static void consumeItemsFromQueue(ArrayBlockingQueue<Long> q, FluxSink<Long> sink) {
while (true) {
try {
sink.next(q.take());
} catch (Throwable t) {
System.err.println("Error in catch");
}
}
}
private static void startProducer(ArrayBlockingQueue<Long> q) {
Thread thread = new Thread(() -> {
while (true) {
try {
q.put(System.currentTimeMillis());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
此代码产生以下输出:
Processing 1580494319870
Caught error bum! in obj:null
Processing 1580494321871
Caught error bum! in obj:null
根据 onErrorContinue
中的文档,对象应该是导致错误的值。因此我希望它是来自 flatMap
的 ctx
对象。相反它是空的。
这是错误还是我对文档的理解有误?
关于 onErrorContinue
行为的推理可能相当违反直觉,因此我总是建议尽可能避免使用它。
According to the documentation in onErrorContinue the object should be the value that caused the error. Therefore I would expect it to be the ctx object from flatMap. Instead it is null.
啊,但是 ctx
不是 导致错误的值,因为您的外部 flatMap()
调用工作正常 - 它只是中继inner Flux
(您示例中的 Flux.push()
行)中发生的错误。因为没有导致此错误的值(它只是抛出异常) , 没有报告价值。因此,您在此示例中报告的行为正是我所期望的。
如果您将该行更改为类似以下内容:
.flatMap(ctx -> Flux.push(sink -> sink.next(ctx)).flatMap(x -> Mono.error(new IllegalArgumentException("bum!"))))
...或者:
.flatMap(ctx -> Flux.just(ctx).flatMap(x -> Mono.error(new IllegalArgumentException("bum!"))))
...然后你会看到类似于 Caught error bum! in obj:1591657236326
的内容,因为那里的异常实际上 有 原因,即由操作员处理引起的错误该值。
我在使用项目反应器编写的代码方面遇到了一些问题:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.12.RELEASE</version>
</dependency>
请考虑以下代码:
class Scratch {
public static void main(String[] args) {
ArrayBlockingQueue<Long> q = new ArrayBlockingQueue<>(10);
startProducer(q);
Flux.<Long> create(sink -> consumeItemsFromQueue(q, sink))
.doOnNext(ctx -> System.out.println("Processing " + ctx))
.flatMap(ctx -> Flux.push((sink)->{ throw new IllegalArgumentException("bum!");}))
.onErrorContinue((ex, obj)->{
System.err.println("Caught error "+ex.getMessage() +" in obj:" +obj);
})
.doOnNext(element -> System.out.println("Do On NExt: " + element))
.subscribe();
}
private static void consumeItemsFromQueue(ArrayBlockingQueue<Long> q, FluxSink<Long> sink) {
while (true) {
try {
sink.next(q.take());
} catch (Throwable t) {
System.err.println("Error in catch");
}
}
}
private static void startProducer(ArrayBlockingQueue<Long> q) {
Thread thread = new Thread(() -> {
while (true) {
try {
q.put(System.currentTimeMillis());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
此代码产生以下输出:
Processing 1580494319870
Caught error bum! in obj:null
Processing 1580494321871
Caught error bum! in obj:null
根据 onErrorContinue
中的文档,对象应该是导致错误的值。因此我希望它是来自 flatMap
的 ctx
对象。相反它是空的。
这是错误还是我对文档的理解有误?
关于 onErrorContinue
行为的推理可能相当违反直觉,因此我总是建议尽可能避免使用它。
According to the documentation in onErrorContinue the object should be the value that caused the error. Therefore I would expect it to be the ctx object from flatMap. Instead it is null.
啊,但是 ctx
不是 导致错误的值,因为您的外部 flatMap()
调用工作正常 - 它只是中继inner Flux
(您示例中的 Flux.push()
行)中发生的错误。因为没有导致此错误的值(它只是抛出异常) , 没有报告价值。因此,您在此示例中报告的行为正是我所期望的。
如果您将该行更改为类似以下内容:
.flatMap(ctx -> Flux.push(sink -> sink.next(ctx)).flatMap(x -> Mono.error(new IllegalArgumentException("bum!"))))
...或者:
.flatMap(ctx -> Flux.just(ctx).flatMap(x -> Mono.error(new IllegalArgumentException("bum!"))))
...然后你会看到类似于 Caught error bum! in obj:1591657236326
的内容,因为那里的异常实际上 有 原因,即由操作员处理引起的错误该值。