RxJava:重试地图操作
RxJava: retrying map actions
我有一个 Observable,其中每个项目的转换方式可能会导致异常,但可以重试。我不希望失败中断流,因为每个项目都代表一个独立的事务。我能想到的最佳解决方案是:
final AtomicLong errCount = new AtomicLong();
Observable.from(ImmutableList.of(1L, 2L, 3L)).flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.from(ImmutableList.of(aLong)).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
if (aLong == 2 && errCount.getAndIncrement() < 1) {
throw new RuntimeException("retryable error");
}
return aLong * 100;
}
}).retry(2);
}
}).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
// Desired output: 100, 200, 300 (not 100, 100, 200, 300)
问题:
- 重试逻辑真的很冗长。
- 如果任何项目在 2 次重试后失败,则流中断(不再处理更多项目)。我想要一种干净的方式来 return 像 Finagle 的 Try 这样的异常和结果,这样我就可以处理所有的异常。
The retry logic is really verbose.
您可以通过切换到 Observable.just(t1, t2, t3)
构造函数来完全避免使用 ImmutableList
。这基本上做同样的事情,但不那么冗长。
我看到您在进行 flatMapping,以便将每个值转换为 Observable。这将防止在映射单个值时遇到的 onError 取消订阅整个链。因此,当运算符抛出时,它将取消订阅该值的内部可观察链。否则,错误会导致取消订阅并重新订阅主要的外部可观察对象。
如果你想保持这种行为但减少样板(除了明显切换到 Java8 lambda 之外,我可以想到 2 个选择。
首先,在 post 重试后重新订阅并删除重复数据。如果您有一个具有良好 hashcode
和 equals
实现的值,您可以使用过滤器附加到有状态集,并且仅当该集尚未包含该值时才使用 onNext。
Observable.<Long> just(1L, 2L, 3L)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
if (aLong == 2 && errCount.getAndIncrement() < 1) {
throw new RuntimeException("retryable error");
}
return aLong * 100;
}})
.retry(2)
.filter(new Func1<Long, Boolean>() {
Set<Long> state = null;
@Override
public Boolean call(Long a) {
if (state == null)
state = new HashSet<Long>();
if (!state.contains(a)) {
state.add(a);
return true;
}
return false;
}})
.forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}});
其次,您可以将可观察对象从重新订阅时停止的位置切换到 resume。请注意,当使用缓冲运算符(observeOn、merge、flatMap)时,这可能会导致数据丢失问题。这是因为他们将继续以与下游消费者脱钩的方式从生产者那里消费。因此,您需要确保在重试之前不进行缓冲。如果您要实现支持背压的可观察源,还有其他注意事项。
// Should resume right where it left off
resumableObservable.map(...).retry(2).observeOn()
// Don't do this. ObserveOn will buffer values and resume will lose data.
resumableObservable.map(...).observeOn().retry(2)
// Also bad if running async observables. Merging buffers so this could have data loss.
Observable.merge(resumableObservable.map(...)).retry(2)
If any item fails after 2 retries, the stream is broken (no more items are processed). I'd like a clean way to return both exceptions and results like Finagle's Try, so I can process all the exceptions.
您可以将不可靠的地图从 Long -> Long
更改为 Long -> Tuple<Long, List<Exception>>
。由于这是一大堆泛型并且很快变得很麻烦,我建议使用重试运算符的不同变体,即 retryWhen(Func1<Observable<Throwable>, Observable<?>>)
。以下是如何在您的代码中使用它的示例。
}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>(){
@Override
public Observable<?> call(Observable<? extends Throwable> o) {
final AtomicInteger count = new AtomicInteger();
return o.filter(new Func1<Throwable, Boolean>() {
@Override
public Boolean call(Throwable t) {
return t instanceof RuntimeException || count.getAndIncrement() < 5;
}}).delay(1, TimeUnit.SECONDS, Schedulers.immediate());
}})
使用 retryWhen 的好处是您可以在一段时间后以非阻塞方式轻松实现延迟重试。
我有一个 Observable,其中每个项目的转换方式可能会导致异常,但可以重试。我不希望失败中断流,因为每个项目都代表一个独立的事务。我能想到的最佳解决方案是:
final AtomicLong errCount = new AtomicLong();
Observable.from(ImmutableList.of(1L, 2L, 3L)).flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.from(ImmutableList.of(aLong)).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
if (aLong == 2 && errCount.getAndIncrement() < 1) {
throw new RuntimeException("retryable error");
}
return aLong * 100;
}
}).retry(2);
}
}).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
// Desired output: 100, 200, 300 (not 100, 100, 200, 300)
问题:
- 重试逻辑真的很冗长。
- 如果任何项目在 2 次重试后失败,则流中断(不再处理更多项目)。我想要一种干净的方式来 return 像 Finagle 的 Try 这样的异常和结果,这样我就可以处理所有的异常。
The retry logic is really verbose.
您可以通过切换到 Observable.just(t1, t2, t3)
构造函数来完全避免使用 ImmutableList
。这基本上做同样的事情,但不那么冗长。
我看到您在进行 flatMapping,以便将每个值转换为 Observable。这将防止在映射单个值时遇到的 onError 取消订阅整个链。因此,当运算符抛出时,它将取消订阅该值的内部可观察链。否则,错误会导致取消订阅并重新订阅主要的外部可观察对象。
如果你想保持这种行为但减少样板(除了明显切换到 Java8 lambda 之外,我可以想到 2 个选择。
首先,在 post 重试后重新订阅并删除重复数据。如果您有一个具有良好 hashcode
和 equals
实现的值,您可以使用过滤器附加到有状态集,并且仅当该集尚未包含该值时才使用 onNext。
Observable.<Long> just(1L, 2L, 3L)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
if (aLong == 2 && errCount.getAndIncrement() < 1) {
throw new RuntimeException("retryable error");
}
return aLong * 100;
}})
.retry(2)
.filter(new Func1<Long, Boolean>() {
Set<Long> state = null;
@Override
public Boolean call(Long a) {
if (state == null)
state = new HashSet<Long>();
if (!state.contains(a)) {
state.add(a);
return true;
}
return false;
}})
.forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}});
其次,您可以将可观察对象从重新订阅时停止的位置切换到 resume。请注意,当使用缓冲运算符(observeOn、merge、flatMap)时,这可能会导致数据丢失问题。这是因为他们将继续以与下游消费者脱钩的方式从生产者那里消费。因此,您需要确保在重试之前不进行缓冲。如果您要实现支持背压的可观察源,还有其他注意事项。
// Should resume right where it left off
resumableObservable.map(...).retry(2).observeOn()
// Don't do this. ObserveOn will buffer values and resume will lose data.
resumableObservable.map(...).observeOn().retry(2)
// Also bad if running async observables. Merging buffers so this could have data loss.
Observable.merge(resumableObservable.map(...)).retry(2)
If any item fails after 2 retries, the stream is broken (no more items are processed). I'd like a clean way to return both exceptions and results like Finagle's Try, so I can process all the exceptions.
您可以将不可靠的地图从 Long -> Long
更改为 Long -> Tuple<Long, List<Exception>>
。由于这是一大堆泛型并且很快变得很麻烦,我建议使用重试运算符的不同变体,即 retryWhen(Func1<Observable<Throwable>, Observable<?>>)
。以下是如何在您的代码中使用它的示例。
}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>(){
@Override
public Observable<?> call(Observable<? extends Throwable> o) {
final AtomicInteger count = new AtomicInteger();
return o.filter(new Func1<Throwable, Boolean>() {
@Override
public Boolean call(Throwable t) {
return t instanceof RuntimeException || count.getAndIncrement() < 5;
}}).delay(1, TimeUnit.SECONDS, Schedulers.immediate());
}})
使用 retryWhen 的好处是您可以在一段时间后以非阻塞方式轻松实现延迟重试。