重试(或)重试时似乎不适用于热通量
retry (or) retryWhen does not seem to work with hot flux
我正在尝试为我的工作实现反应堆核心。我坚持在出现错误时需要执行的重试。下面是我添加任何错误之前的示例代码
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "A")))
.flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
.log()
.subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
System.out.println("Publishing " + pendingItems + " item");
mainSink.next(String.valueOf(pendingItems));
System.out.println("Published " + pendingItems + " item");
pendingItems--;
}
当我这样做的时候。它工作正常。
来到错误案例,假设第二个操作(附加 "A")对一个项目失败。我正在尝试获得以下行为。
- 我尝试添加的部分"A"必须重试3次才能放弃
- 我也想在放弃之前重试整个 Flux 5 次
想知道我怎样才能达到同样的效果。
AtomicInteger count = new AtomicInteger(0);
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> {
System.out.println("Processing for adding A : " + o);
if(count.incrementAndGet() >= 25) {
throw new RuntimeException("More than 25th item.. Boom.. !!!");
} else {
return Mono.just(o + "A")));
}
}).retry(5)
.doOnError(throwable -> System.out.println("**** Inner Error"))
).flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
.log()
.subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
System.out.println("Publishing " + pendingItems + " item");
mainSink.next(String.valueOf(pendingItems));
System.out.println("Published " + pendingItems + " item");
pendingItems--;
}
当我在第一个 flatMap 中添加 retry(5) 时,如上所示,它工作正常,它为第 25 个进来的人重试附加 A 5 次 - 从日志中可以明显看出
我无法实现完整的通量重试(我上述要求中的第 (2) 点)。我尝试在第二个通量之后添加一个 .retry(3),认为它会重试整个通量。但它似乎并没有重试。有人可以帮忙吗?
AtomicInteger count = new AtomicInteger(0);
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> {
System.out.println("Processing for adding A : " + o);
if(count.incrementAndGet() >= 25) {
throw new RuntimeException("More than 25th item.. Boom.. !!!");
} else {
return Mono.just(o + "A")));
}
}).retry(5)
.doOnError(throwable -> System.out.println("**** Inner Error"))
).flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
.retry(3)
.log()
.subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
System.out.println("Publishing " + pendingItems + " item");
mainSink.next(String.valueOf(pendingItems));
System.out.println("Published " + pendingItems + " item");
pendingItems--;
}
所有形式的 retry
都可以通过重新订阅 "retried" 来源来工作。它对冷 Flux
有奇效,但热 Flux
不太适应。
这里使用 publish()
转换,不能保证迟到的订阅者:因为重试被认为是迟到的订阅者,所以它什么也看不到,因为 publish
已经被原来的错误完成断开了。
您需要一种方法来保留最后一项(可能导致异常的一项)并为新订阅者重播(或者更确切地说,用于重试)。
另一个问题是您使用 create
来获取您存储在外部的 FluxSink
,这不是一个好的方法。
好消息是,这两个问题都可以通过使用 ReplayProcessor
一次性解决:你正确地得到了一个专用的接收器来手动推送数据,并且在错误的情况下 retry
将是能够从历史记录中获取错误触发值并再次尝试:
@Test
public void test() {
ReplayProcessor<String> foo =
ReplayProcessor.create(1);
FluxSink<String> sink = foo.sink();
foo.subscribe(System.out::println, System.out::println);
AtomicInteger transientError = new AtomicInteger(5);
foo.map(v -> "C".equals(v) && transientError.decrementAndGet() >= 0 ? v + (100 / 0) : v)
.doOnError(e -> System.err.println("Error, should be retried: " + e))
.retry(5)
.subscribe(System.err::println, System.err::println);
sink.next("A");
sink.next("B");
sink.next("C");
sink.complete();
}
这会打印:
A
B
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
A
C
B
C
我正在尝试为我的工作实现反应堆核心。我坚持在出现错误时需要执行的重试。下面是我添加任何错误之前的示例代码
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "A")))
.flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
.log()
.subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
System.out.println("Publishing " + pendingItems + " item");
mainSink.next(String.valueOf(pendingItems));
System.out.println("Published " + pendingItems + " item");
pendingItems--;
}
当我这样做的时候。它工作正常。
来到错误案例,假设第二个操作(附加 "A")对一个项目失败。我正在尝试获得以下行为。
- 我尝试添加的部分"A"必须重试3次才能放弃
- 我也想在放弃之前重试整个 Flux 5 次
想知道我怎样才能达到同样的效果。
AtomicInteger count = new AtomicInteger(0);
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> {
System.out.println("Processing for adding A : " + o);
if(count.incrementAndGet() >= 25) {
throw new RuntimeException("More than 25th item.. Boom.. !!!");
} else {
return Mono.just(o + "A")));
}
}).retry(5)
.doOnError(throwable -> System.out.println("**** Inner Error"))
).flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
.log()
.subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
System.out.println("Publishing " + pendingItems + " item");
mainSink.next(String.valueOf(pendingItems));
System.out.println("Published " + pendingItems + " item");
pendingItems--;
}
当我在第一个 flatMap 中添加 retry(5) 时,如上所示,它工作正常,它为第 25 个进来的人重试附加 A 5 次 - 从日志中可以明显看出
我无法实现完整的通量重试(我上述要求中的第 (2) 点)。我尝试在第二个通量之后添加一个 .retry(3),认为它会重试整个通量。但它似乎并没有重试。有人可以帮忙吗?
AtomicInteger count = new AtomicInteger(0);
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> {
System.out.println("Processing for adding A : " + o);
if(count.incrementAndGet() >= 25) {
throw new RuntimeException("More than 25th item.. Boom.. !!!");
} else {
return Mono.just(o + "A")));
}
}).retry(5)
.doOnError(throwable -> System.out.println("**** Inner Error"))
).flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
.retry(3)
.log()
.subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
System.out.println("Publishing " + pendingItems + " item");
mainSink.next(String.valueOf(pendingItems));
System.out.println("Published " + pendingItems + " item");
pendingItems--;
}
所有形式的 retry
都可以通过重新订阅 "retried" 来源来工作。它对冷 Flux
有奇效,但热 Flux
不太适应。
这里使用 publish()
转换,不能保证迟到的订阅者:因为重试被认为是迟到的订阅者,所以它什么也看不到,因为 publish
已经被原来的错误完成断开了。
您需要一种方法来保留最后一项(可能导致异常的一项)并为新订阅者重播(或者更确切地说,用于重试)。
另一个问题是您使用 create
来获取您存储在外部的 FluxSink
,这不是一个好的方法。
好消息是,这两个问题都可以通过使用 ReplayProcessor
一次性解决:你正确地得到了一个专用的接收器来手动推送数据,并且在错误的情况下 retry
将是能够从历史记录中获取错误触发值并再次尝试:
@Test
public void test() {
ReplayProcessor<String> foo =
ReplayProcessor.create(1);
FluxSink<String> sink = foo.sink();
foo.subscribe(System.out::println, System.out::println);
AtomicInteger transientError = new AtomicInteger(5);
foo.map(v -> "C".equals(v) && transientError.decrementAndGet() >= 0 ? v + (100 / 0) : v)
.doOnError(e -> System.err.println("Error, should be retried: " + e))
.retry(5)
.subscribe(System.err::println, System.err::println);
sink.next("A");
sink.next("B");
sink.next("C");
sink.complete();
}
这会打印:
A
B
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
A
C
B
C