重复触发事件流的操作的正确方法
Proper way to repeat an operation firing off an event stream
我是 RxJava 的新手,所以我仍在努力了解它。我有一个 Observable
代表按钮点击流,所以它很热。每次单击该按钮时,我都想做一些 I/O。如果失败,我想重复并尝试再次 I/O ,直到成功。这似乎是使用 retry()
或 repeat()
的好机会,但它们仅适用于热可观察对象,而不适用于冷对象。
这里有一些伪代码来实现我正在尝试做的事情:
buttonRequests
.map(actionEvent -> doIO())
.repeatAboveIfFailedUntilIOSucceeds()
.subscribe(...);
我考虑过使用 flatMap
来复制事件,而不是使用 skip
来忽略剩余的事件(如果它成功的话),但这并不能完全让我获得不确定次数的尝试。
思考这个问题的正确方法是什么?
请看测试。在每个事件中,都会触发一个新的 IO-Request。 Switch-Map 类似于 Flat-Map,但它会在新的上游事件到来时取消最近的订阅。如果您使用并发,Flat-Map 只会开始一个新订阅。因此,假设您的热可观察对象触发了一个事件,并且 flatMap 开始在另一个线程(subscribeOn)上执行您的 IO 工作。如果另一个事件到来,而最后一个事件仍在执行,它将开始执行另一个 IO 任务。 Switch-Map 将取消订阅最后一个并为当前事件启动一个。让我们看一下 retry() 运算符。重试将重新订阅 'ioWorkWrapped' 提供的可观察对象,直到可观察对象以 onComplete 结束。这可能非常危险,因为想象它每次尝试都会失败。它会永远旋转。建议使用 'exponential-backoff' 并在 X 次尝试后提供备用可观察失败。 'retryWhen'的用法请看这本好书: Reactive Programming with RxJava
public class LibraryTest {
private AtomicInteger idx;
@Before
public void setUp() throws Exception {
idx = new AtomicInteger(0);
}
@Test
public void name() throws Exception {
Observable<String> stringObservable = Observable.just(1)
.switchMap(integer -> ioWorkWrapped()
.doOnError(throwable -> System.out.println("Something went wrong."))
.retry()
);
stringObservable.test()
.await()
.assertResult("value");
}
private Observable<String> ioWorkWrapped() {
return Observable.defer(() -> {
try {
Thread.sleep(500); // IO Work
if (idx.getAndIncrement() < 5) { // for testing...
return Observable.error(new IllegalStateException("Wurst"));
}
return Observable.just("value");
} catch (Exception ex) {
return Observable.error(ex);
}
});
}
}
您需要使用运算符 retryWhen
,以防您的 I/O 操作失败,您可以抛出在运算符中检查的可运行异常。如果是您重试的那种类型的异常。
本例中我们重试4次。但是这个条件可以通过我们收到的 throwable 类型来改变。
int count=0;
@Test
public void retryWhenConnectionError() {
Subscription subscription = Observable.just(null)
.map(connection -> {
System.out.println("Trying to open connection");
connection.toString();
return connection;
})
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) :
Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
Schedulers.newThread())
.subscribe(s -> System.out.println(s));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}
我是 RxJava 的新手,所以我仍在努力了解它。我有一个 Observable
代表按钮点击流,所以它很热。每次单击该按钮时,我都想做一些 I/O。如果失败,我想重复并尝试再次 I/O ,直到成功。这似乎是使用 retry()
或 repeat()
的好机会,但它们仅适用于热可观察对象,而不适用于冷对象。
这里有一些伪代码来实现我正在尝试做的事情:
buttonRequests
.map(actionEvent -> doIO())
.repeatAboveIfFailedUntilIOSucceeds()
.subscribe(...);
我考虑过使用 flatMap
来复制事件,而不是使用 skip
来忽略剩余的事件(如果它成功的话),但这并不能完全让我获得不确定次数的尝试。
思考这个问题的正确方法是什么?
请看测试。在每个事件中,都会触发一个新的 IO-Request。 Switch-Map 类似于 Flat-Map,但它会在新的上游事件到来时取消最近的订阅。如果您使用并发,Flat-Map 只会开始一个新订阅。因此,假设您的热可观察对象触发了一个事件,并且 flatMap 开始在另一个线程(subscribeOn)上执行您的 IO 工作。如果另一个事件到来,而最后一个事件仍在执行,它将开始执行另一个 IO 任务。 Switch-Map 将取消订阅最后一个并为当前事件启动一个。让我们看一下 retry() 运算符。重试将重新订阅 'ioWorkWrapped' 提供的可观察对象,直到可观察对象以 onComplete 结束。这可能非常危险,因为想象它每次尝试都会失败。它会永远旋转。建议使用 'exponential-backoff' 并在 X 次尝试后提供备用可观察失败。 'retryWhen'的用法请看这本好书: Reactive Programming with RxJava
public class LibraryTest {
private AtomicInteger idx;
@Before
public void setUp() throws Exception {
idx = new AtomicInteger(0);
}
@Test
public void name() throws Exception {
Observable<String> stringObservable = Observable.just(1)
.switchMap(integer -> ioWorkWrapped()
.doOnError(throwable -> System.out.println("Something went wrong."))
.retry()
);
stringObservable.test()
.await()
.assertResult("value");
}
private Observable<String> ioWorkWrapped() {
return Observable.defer(() -> {
try {
Thread.sleep(500); // IO Work
if (idx.getAndIncrement() < 5) { // for testing...
return Observable.error(new IllegalStateException("Wurst"));
}
return Observable.just("value");
} catch (Exception ex) {
return Observable.error(ex);
}
});
}
}
您需要使用运算符 retryWhen
,以防您的 I/O 操作失败,您可以抛出在运算符中检查的可运行异常。如果是您重试的那种类型的异常。
本例中我们重试4次。但是这个条件可以通过我们收到的 throwable 类型来改变。
int count=0;
@Test
public void retryWhenConnectionError() {
Subscription subscription = Observable.just(null)
.map(connection -> {
System.out.println("Trying to open connection");
connection.toString();
return connection;
})
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) :
Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
Schedulers.newThread())
.subscribe(s -> System.out.println(s));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}