重试(或)重试时似乎不适用于热通量

retry (or) retryWhen does not seem to work with hot flux

我正在尝试为我的工作实现反应堆核心。我坚持在出现错误时需要执行的重试。下面是我添加任何错误之前的示例代码

FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
   mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "A")))
       .flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
       .log()
       .subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
     System.out.println("Publishing " + pendingItems + " item");
     mainSink.next(String.valueOf(pendingItems));
     System.out.println("Published " + pendingItems + " item");
     pendingItems--;
}

当我这样做的时候。它工作正常。

来到错误案例,假设第二个操作(附加 "A")对一个项目失败。我正在尝试获得以下行为。

  1. 我尝试添加的部分"A"必须重试3次才能放弃
  2. 我也想在放弃之前重试整个 Flux 5 次

想知道我怎样才能达到同样的效果。

AtomicInteger count = new AtomicInteger(0);
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
   mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> {
                  System.out.println("Processing for adding A : " + o);
                  if(count.incrementAndGet() >= 25) {
                       throw new RuntimeException("More than 25th item.. Boom.. !!!");
                  } else {
                       return Mono.just(o + "A")));
                  }
            }).retry(5)
              .doOnError(throwable -> System.out.println("**** Inner Error"))
       ).flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
       .log()
       .subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
     System.out.println("Publishing " + pendingItems + " item");
     mainSink.next(String.valueOf(pendingItems));
     System.out.println("Published " + pendingItems + " item");
     pendingItems--;
} 

当我在第一个 flatMap 中添加 retry(5) 时,如上所示,它工作正常,它为第 25 个进来的人重试附加 A 5 次 - 从日志中可以明显看出

我无法实现完整的通量重试(我上述要求中的第 (2) 点)。我尝试在第二个通量之后添加一个 .retry(3),认为它会重试整个通量。但它似乎并没有重试。有人可以帮忙吗?

AtomicInteger count = new AtomicInteger(0);
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
   mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> {
                  System.out.println("Processing for adding A : " + o);
                  if(count.incrementAndGet() >= 25) {
                       throw new RuntimeException("More than 25th item.. Boom.. !!!");
                  } else {
                       return Mono.just(o + "A")));
                  }
            }).retry(5)
              .doOnError(throwable -> System.out.println("**** Inner Error"))
       ).flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
       .retry(3)
       .log()
       .subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
     System.out.println("Publishing " + pendingItems + " item");
     mainSink.next(String.valueOf(pendingItems));
     System.out.println("Published " + pendingItems + " item");
     pendingItems--;
} 

所有形式的 retry 都可以通过重新订阅 "retried" 来源来工作。它对冷 Flux 有奇效,但热 Flux 不太适应。

这里使用 publish() 转换,不能保证迟到的订阅者:因为重试被认为是迟到的订阅者,所以它什么也看不到,因为 publish 已经被原来的错误完成断开了。

您需要一种方法来保留最后一项(可能导致异常的一项)并为新订阅者重播(或者更确切地说,用于重试)。

另一个问题是您使用 create 来获取您存储在外部的 FluxSink,这不是一个好的方法。

好消息是,这两个问题都可以通过使用 ReplayProcessor 一次性解决:你正确地得到了一个专用的接收器来手动推送数据,并且在错误的情况下 retry 将是能够从历史记录中获取错误触发值并再次尝试:

@Test
public void test() {
    ReplayProcessor<String> foo =
            ReplayProcessor.create(1);
    FluxSink<String> sink = foo.sink();

    foo.subscribe(System.out::println, System.out::println);

    AtomicInteger transientError = new AtomicInteger(5);
    foo.map(v -> "C".equals(v) && transientError.decrementAndGet() >= 0 ? v + (100 / 0) : v)
            .doOnError(e -> System.err.println("Error, should be retried: " + e))
            .retry(5)
            .subscribe(System.err::println, System.err::println);

    sink.next("A");
    sink.next("B");
    sink.next("C");
    sink.complete();
}

这会打印:

A
B
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
A
C
B
C