RxJava:重试地图操作

RxJava: retrying map actions

我有一个 Observable,其中每个项目的转换方式可能会导致异常,但可以重试。我不希望失败中断流,因为每个项目都代表一个独立的事务。我能想到的最佳解决方案是:

    final AtomicLong errCount = new AtomicLong();
    Observable.from(ImmutableList.of(1L, 2L, 3L)).flatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long aLong) {
            return Observable.from(ImmutableList.of(aLong)).map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    if (aLong == 2 && errCount.getAndIncrement() < 1) {
                        throw new RuntimeException("retryable error");
                    }
                    return aLong * 100;
                }
            }).retry(2);
        }
    }).forEach(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println(aLong);
        }
    });

// Desired output: 100, 200, 300 (not 100, 100, 200, 300)

问题:

The retry logic is really verbose.

您可以通过切换到 Observable.just(t1, t2, t3) 构造函数来完全避免使用 ImmutableList。这基本上做同样的事情,但不那么冗长。

我看到您在进行 flatMapping,以便将每个值转换为 Observable。这将防止在映射单个值时遇到的 onError 取消订阅整个链。因此,当运算符抛出时,它将取消订阅该值的内部可观察链。否则,错误会导致取消订阅并重新订阅主要的外部可观察对象。

如果你想保持这种行为但减少样板(除了明显切换到 Java8 lambda 之外,我可以想到 2 个选择。

首先,在 post 重试后重新订阅并删除重复数据。如果您有一个具有良好 hashcodeequals 实现的值,您可以使用过滤器附加到有状态集,并且仅当该集尚未包含该值时才使用 onNext。

Observable.<Long> just(1L, 2L, 3L)
        .map(new Func1<Long, Long>() {
            @Override
            public Long call(Long aLong) {
                if (aLong == 2 && errCount.getAndIncrement() < 1) {
                    throw new RuntimeException("retryable error");
                }
                return aLong * 100;
            }})
        .retry(2)
        .filter(new Func1<Long, Boolean>() {
            Set<Long> state = null;

            @Override
            public Boolean call(Long a) {
                if (state == null)
                    state = new HashSet<Long>();
                if (!state.contains(a)) {
                    state.add(a);
                    return true;
                }
                return false;
            }})
        .forEach(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                System.out.println(aLong);
            }});

其次,您可以将可观察对象从重新订阅时停止的位置切换到 resume。请注意,当使用缓冲运算符(observeOn、merge、flatMap)时,这可能会导致数据丢失问题。这是因为他们将继续以与下游消费者脱钩的方式从生产者那里消费。因此,您需要确保在重试之前不进行缓冲。如果您要实现支持背压的可观察源,还有其他注意事项。

// Should resume right where it left off
resumableObservable.map(...).retry(2).observeOn()

// Don't do this. ObserveOn will buffer values and resume will lose data.
resumableObservable.map(...).observeOn().retry(2)

// Also bad if running async observables. Merging buffers so this could have data loss.
Observable.merge(resumableObservable.map(...)).retry(2)

If any item fails after 2 retries, the stream is broken (no more items are processed). I'd like a clean way to return both exceptions and results like Finagle's Try, so I can process all the exceptions.

您可以将不可靠的地图从 Long -> Long 更改为 Long -> Tuple<Long, List<Exception>>。由于这是一大堆泛型并且很快变得很麻烦,我建议使用重试运算符的不同变体,即 retryWhen(Func1<Observable<Throwable>, Observable<?>>)。以下是如何在您的代码中使用它的示例。

}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>(){
@Override
public Observable<?> call(Observable<? extends Throwable> o) {
    final AtomicInteger count = new AtomicInteger();
    return o.filter(new Func1<Throwable, Boolean>() {
        @Override
        public Boolean call(Throwable t) {
            return t instanceof RuntimeException || count.getAndIncrement() < 5;
        }}).delay(1, TimeUnit.SECONDS, Schedulers.immediate());
}})

使用 retryWhen 的好处是您可以在一段时间后以非阻塞方式轻松实现延迟重试。