在热流上重试操作?
retry operation on hot stream?
Flux.just(1, 2, 3)
.doOnNext(__ -> System.out.println("producing number: " + __))
.publish()
.autoConnect()
.doOnNext((Integer __) -> {
System.out.println("throwing error.");
throw new RuntimeException("aaa");
})
.retry(1, error -> true)
.subscribe(number -> System.out.println("I will never be here..."),
error -> System.out.println("will I be here? " + error),
() -> System.out.println("completed!"));
输出:
producing number: 1
throwing error.
producing number: 2
producing number: 3
预期输出:(根据我的逻辑 - 我确定我错了)
producing number: 1
throwing error.
producing number: 2
throwing error.
producing number: 3
为什么输出与预期输出不同?
这有点棘手,但归结为:
调用 publish
和 autoConnect
的效果是,一旦您订阅,就会创建一个内部订阅,即使外部订阅被取消,该订阅也会持续存在。将其视为由 publish
和 autoConnect
.
连接的两个流
--inner stream--> publish/connect-> --outer stream--> subscriber
无论外流发生什么,内流都会继续运行。这是有意为之的,因为您有一个热可观察对象,并且您可能有多个订阅者。如果一个断开连接,那么其他人仍然希望从流中获得值。换句话说,发布运算符告诉源它可以根据需要生成值。
--inner stream--> publish/connect-> --outer stream--> subscriber1
--outer stream--> subscriber2
您可以通过删除 retry
来验证该行为。数字 1 和 2 仍将被打印。如果您希望源停止生成值,您可以使用 refCount
而不是 autoConnect
。 refCount
如果没有更多订阅者,则取消内部流。
棘手的部分来了:这个流是同步的并且 Streams are just functions。它在幕后有点复杂,但是重试运算符 运行s 中的订阅函数直到内部流完成。只有这样它才会重新订阅。
它与异步流不同,例如使用 Flux.interval
创建。
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> System.out.println("producing number: " + __))
.publish()
.autoConnect()
.doOnNext((Integer __) -> {
System.out.println("throwing error.");
throw new RuntimeException("aaa");
})
.retry(1)
.subscribe()
当您调用 .subscribe()
时,retry
将在内部调用 doOnNext
上的订阅,依此类推,直到第一个 doOnNext
调用 Flux 上的 subscribe
。间隔 Flux 表示 "Ok, I'll emit the first value in one second" 并且订阅已建立。
一秒后发出一个值,抛出一个错误,重试运算符再次订阅。
在您的示例中,在订阅完全建立之前,Flux 已经在订阅调用期间开始发出值,可以这么说。
Flux.just(1, 2, 3)
.doOnNext(__ -> System.out.println("producing number: " + __))
.publish()
.autoConnect()
.doOnNext((Integer __) -> {
System.out.println("throwing error.");
throw new RuntimeException("aaa");
})
.retry(1, error -> true)
.subscribe(number -> System.out.println("I will never be here..."),
error -> System.out.println("will I be here? " + error),
() -> System.out.println("completed!"));
输出:
producing number: 1
throwing error.
producing number: 2
producing number: 3
预期输出:(根据我的逻辑 - 我确定我错了)
producing number: 1
throwing error.
producing number: 2
throwing error.
producing number: 3
为什么输出与预期输出不同?
这有点棘手,但归结为:
调用 publish
和 autoConnect
的效果是,一旦您订阅,就会创建一个内部订阅,即使外部订阅被取消,该订阅也会持续存在。将其视为由 publish
和 autoConnect
.
--inner stream--> publish/connect-> --outer stream--> subscriber
无论外流发生什么,内流都会继续运行。这是有意为之的,因为您有一个热可观察对象,并且您可能有多个订阅者。如果一个断开连接,那么其他人仍然希望从流中获得值。换句话说,发布运算符告诉源它可以根据需要生成值。
--inner stream--> publish/connect-> --outer stream--> subscriber1
--outer stream--> subscriber2
您可以通过删除 retry
来验证该行为。数字 1 和 2 仍将被打印。如果您希望源停止生成值,您可以使用 refCount
而不是 autoConnect
。 refCount
如果没有更多订阅者,则取消内部流。
棘手的部分来了:这个流是同步的并且 Streams are just functions。它在幕后有点复杂,但是重试运算符 运行s 中的订阅函数直到内部流完成。只有这样它才会重新订阅。
它与异步流不同,例如使用 Flux.interval
创建。
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> System.out.println("producing number: " + __))
.publish()
.autoConnect()
.doOnNext((Integer __) -> {
System.out.println("throwing error.");
throw new RuntimeException("aaa");
})
.retry(1)
.subscribe()
当您调用 .subscribe()
时,retry
将在内部调用 doOnNext
上的订阅,依此类推,直到第一个 doOnNext
调用 Flux 上的 subscribe
。间隔 Flux 表示 "Ok, I'll emit the first value in one second" 并且订阅已建立。
一秒后发出一个值,抛出一个错误,重试运算符再次订阅。
在您的示例中,在订阅完全建立之前,Flux 已经在订阅调用期间开始发出值,可以这么说。