在热流上重试操作?

retry operation on hot stream?

Flux.just(1, 2, 3)
        .doOnNext(__ -> System.out.println("producing number: " + __))
        .publish()
        .autoConnect()
        .doOnNext((Integer __) -> {
            System.out.println("throwing error.");
            throw new RuntimeException("aaa");
        })
        .retry(1, error -> true)
        .subscribe(number -> System.out.println("I will never be here..."),
                error -> System.out.println("will I be here? " + error),
                () -> System.out.println("completed!"));

输出:

producing number: 1
throwing error.
producing number: 2
producing number: 3

预期输出:(根据我的逻辑 - 我确定我错了)

producing number: 1
throwing error.
producing number: 2
throwing error.
producing number: 3

为什么输出与预期输出不同?

这有点棘手,但归结为:

调用 publishautoConnect 的效果是,一旦您订阅,就会创建一个内部订阅,即使外部订阅被取消,该订阅也会持续存在。将其视为由 publishautoConnect.

连接的两个流
--inner stream--> publish/connect-> --outer stream--> subscriber

无论外流发生什么,内流都会继续运行。这是有意为之的,因为您有一个热可观察对象,并且您可能有多个订阅者。如果一个断开连接,那么其他人仍然希望从流中获得值。换句话说,发布运算符告诉源它可以根据需要生成值。

    --inner stream--> publish/connect-> --outer stream--> subscriber1
                                        --outer stream--> subscriber2

您可以通过删除 retry 来验证该行为。数字 1 和 2 仍将被打印。如果您希望源停止生成值,您可以使用 refCount 而不是 autoConnectrefCount 如果没有更多订阅者,则取消内部流。

棘手的部分来了:这个流是同步的并且 Streams are just functions。它在幕后有点复杂,但是重试运算符 运行s 中的订阅函数直到内部流完成。只有这样它才会重新订阅。

它与异步流不同,例如使用 Flux.interval 创建。

Flux.interval(Duration.ofSeconds(1))
    .doOnNext(__ -> System.out.println("producing number: " + __))
    .publish()
    .autoConnect()
    .doOnNext((Integer __) -> {
        System.out.println("throwing error.");
        throw new RuntimeException("aaa");
    })
    .retry(1)
    .subscribe()

当您调用 .subscribe() 时,retry 将在内部调用 doOnNext 上的订阅,依此类推,直到第一个 doOnNext 调用 Flux 上的 subscribe。间隔 Flux 表示 "Ok, I'll emit the first value in one second" 并且订阅已建立。

一秒后发出一个值,抛出一个错误,重试运算符再次订阅。

在您的示例中,在订阅完全建立之前,Flux 已经在订阅调用期间开始发出值,可以这么说。