Project Reactor 中触发错误的 onErrorContinue 值为 null

Project Reactor in onErrorContinue value that triggered error is null

我在使用项目反应器编写的代码方面遇到了一些问题:

<dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
      <version>3.2.12.RELEASE</version>
</dependency>

请考虑以下代码:

class Scratch {
    public static void main(String[] args) {

        ArrayBlockingQueue<Long> q = new ArrayBlockingQueue<>(10);

        startProducer(q);

        Flux.<Long> create(sink -> consumeItemsFromQueue(q, sink))
                .doOnNext(ctx -> System.out.println("Processing " + ctx))
                .flatMap(ctx -> Flux.push((sink)->{ throw new IllegalArgumentException("bum!");}))
                .onErrorContinue((ex, obj)->{
                    System.err.println("Caught error "+ex.getMessage() +" in obj:" +obj);
                })
                .doOnNext(element -> System.out.println("Do On NExt: " + element))
                .subscribe();

    }

    private static void consumeItemsFromQueue(ArrayBlockingQueue<Long> q, FluxSink<Long> sink) {
        while (true) {
            try {
                sink.next(q.take());
            } catch (Throwable t) {
                System.err.println("Error in catch");
            }
        }
    }

    private static void startProducer(ArrayBlockingQueue<Long> q) {
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    q.put(System.currentTimeMillis());
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();
    }
}

此代码产生以下输出:

Processing 1580494319870
Caught error bum! in obj:null
Processing 1580494321871
Caught error bum! in obj:null

根据 onErrorContinue 中的文档,对象应该是导致错误的值。因此我希望它是来自 flatMapctx 对象。相反它是空的。

这是错误还是我对文档的理解有误?

关于 onErrorContinue 行为的推理可能相当违反直觉,因此我总是建议尽可能避免使用它。

According to the documentation in onErrorContinue the object should be the value that caused the error. Therefore I would expect it to be the ctx object from flatMap. Instead it is null.

啊,但是 ctx 不是 导致错误的值,因为您的外部 flatMap() 调用工作正常 - 它只是中继inner Flux(您示例中的 Flux.push() 行)中发生的错误。因为没有导致此错误的值(它只是抛出异常) , 没有报告价值。因此,您在此示例中报告的行为正是我所期望的。

如果您将该行更改为类似以下内容:

.flatMap(ctx -> Flux.push(sink -> sink.next(ctx)).flatMap(x -> Mono.error(new IllegalArgumentException("bum!"))))

...或者:

.flatMap(ctx -> Flux.just(ctx).flatMap(x -> Mono.error(new IllegalArgumentException("bum!"))))

...然后你会看到类似于 Caught error bum! in obj:1591657236326 的内容,因为那里的异常实际上 原因,即由操作员处理引起的错误该值。