重复触发事件流的操作的正确方法

Proper way to repeat an operation firing off an event stream

我是 RxJava 的新手,所以我仍在努力了解它。我有一个 Observable 代表按钮点击流,所以它很热。每次单击该按钮时,我都想做一些 I/O。如果失败,我想重复并尝试再次 I/O ,直到成功。这似乎是使用 retry()repeat() 的好机会,但它们仅适用于热可观察对象,而不适用于冷对象。

这里有一些伪代码来实现我正在尝试做的事情:

buttonRequests
   .map(actionEvent -> doIO())
   .repeatAboveIfFailedUntilIOSucceeds()
   .subscribe(...);

我考虑过使用 flatMap 来复制事件,而不是使用 skip 来忽略剩余的事件(如果它成功的话),但这并不能完全让我获得不确定次数的尝试。

思考这个问题的正确方法是什么?

请看测试。在每个事件中,都会触发一个新的 IO-Request。 Switch-Map 类似于 Flat-Map,但它会在新的上游事件到来时取消最近的订阅。如果您使用并发,Flat-Map 只会开始一个新订阅。因此,假设您的热可观察对象触发了一个事件,并且 flatMap 开始在另一个线程(subscribeOn)上执行您的 IO 工作。如果另一个事件到来,而最后一个事件仍在执行,它将开始执行另一个 IO 任务。 Switch-Map 将取消订阅最后一个并为当前事件启动一个。让我们看一下 retry() 运算符。重试将重新订阅 'ioWorkWrapped' 提供的可观察对象,直到可观察对象以 onComplete 结束。这可能非常危险,因为想象它每次尝试都会失败。它会永远旋转。建议使用 'exponential-backoff' 并在 X 次尝试后提供备用可观察失败。 'retryWhen'的用法请看这本好书: Reactive Programming with RxJava

public class LibraryTest {
    private AtomicInteger idx;

    @Before
    public void setUp() throws Exception {
        idx = new AtomicInteger(0);
    }

    @Test
    public void name() throws Exception {
        Observable<String> stringObservable = Observable.just(1)
                .switchMap(integer -> ioWorkWrapped()
                        .doOnError(throwable -> System.out.println("Something went wrong."))
                        .retry()
                );

        stringObservable.test()
                .await()
                .assertResult("value");


    }

    private Observable<String> ioWorkWrapped() {
        return Observable.defer(() -> {
            try {
                Thread.sleep(500); // IO Work
                if (idx.getAndIncrement() < 5) { // for testing...
                    return Observable.error(new IllegalStateException("Wurst"));
                }
                return Observable.just("value");
            } catch (Exception ex) {
                return Observable.error(ex);
            }
        });
    }
}

您需要使用运算符 retryWhen,以防您的 I/O 操作失败,您可以抛出在运算符中检查的可运行异常。如果是您重试的那种类型的异常。

本例中我们重试4次。但是这个条件可以通过我们收到的 throwable 类型来改变。

int count=0;

@Test
public void retryWhenConnectionError() {
    Subscription subscription = Observable.just(null)
            .map(connection -> {
                System.out.println("Trying to open connection");
                connection.toString();
                return connection;
            })
            .retryWhen(errors -> errors.doOnNext(o -> count++)
                            .flatMap(t -> count > 3 ? Observable.error(t) :
                                    Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                    Schedulers.newThread())
            .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

您可以在此处查看更多示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java