RxJava onErrorResumeNext 调度器
RxJava onErrorResumeNext scheduler
我有一个 observable,它可能会因特殊异常而失败,在这种情况下,我想显示一个带有重试按钮的对话框。我已经看到 答案,但它并不能完全满足我的要求。我无法使用 retryWhen
来解决我的问题,所以我使用了 onErrorResumeNext
。如果你能想出一种方法来对retryWhen
做同样的事情,请告诉。
现在我有这段代码:
public Observable<Order> proceedWithOrdering(Activity activity) {
return apiService.createOrder()
.subscribeOn(Schedulers.io())
.compose(applyRetryLogic(activity))
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread());
}
public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
return observable -> observable
.onErrorResumeNext(retry(observable, activity))
.subscribeOn(AndroidSchedulers.mainThread());
}
public <T> Func1<Throwable, ? extends Observable<? extends T>> retry(Observable toRetry, Activity activity) {
return throwable -> {
if (throwable instanceof NetworkException) {
MaterialDialog dialog = retryDialog(activity);
View retry = dialog.getActionButton(DialogAction.POSITIVE);
View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
Observable<Object> retryClick = RxView.clicks(retry).map(o -> {
dialog.dismiss();
return o;
});
Observable<Object> cancelClick = RxView.clicks(cancel).flatMap(o -> {
dialog.dismiss();
return Observable.error(throwable);
});
dialog.show();
return Observable.amb(retryClick, cancelClick)
.flatMap(o -> toRetry.compose(applyRetryLogic(activity)));
} else {
return Observable.error(throwable);
}
};
}
问题是 retry
中的 call
没有在主线程上执行,它引发了 Can't create handler inside thread that has not called Looper.prepare()
异常。
问题是 - 如何强制它在主线程上执行?如您所见,我已经尝试在 compose
和 onErrorResumeNext
之后立即执行 subscribeOn(AndroidSchedulers.mainThread())
但没有成功。
我已经使用不在单独线程上运行的简单可观察对象测试了我的代码,并且运行良好。
您可以通过 flatMap
ping a PublishSubject
来完成此操作,然后在按下相关按钮后更新它。这是一个经典的 Java Swing 示例。
public class RetryWhenEnter {
public static void main(String[] args) {
AtomicInteger d = new AtomicInteger();
Observable<Integer> source = Observable.just(1);
source.flatMap(v -> {
if (d.incrementAndGet() < 3) {
return Observable.error(new RuntimeException());
}
return Observable.just(v);
})
.retryWhen(err -> {
return err.flatMap(e -> {
System.out.println(Thread.currentThread() + " Error!");
PublishSubject<Integer> choice = PublishSubject.create();
SwingUtilities.invokeLater(() -> {
int c = JOptionPane.showConfirmDialog(null,
e.toString() + "\r\nRetry?", "Error",
JOptionPane.YES_NO_OPTION);
if (c == JOptionPane.YES_OPTION) {
choice.onNext(1);
} else {
choice.onCompleted();
}
});
return choice;
});
}).subscribe(System.out::println,
Throwable::printStackTrace);
}
}
编辑:
或在 onErrorResumeNext
之前或使用 retryWhen
时使用 observeOn(AndroidSchedulers.mainThread())
:retryWhen(o -> o.observeOn(AndroidSchedulers.mainThread())...)
。
编辑 2
我回滚了更改,因此答案再次有意义。
有一种方法可以使用 retryWhen
解决我的问题(感谢@akarnokd):
public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
return observable -> observable
.retryWhen(err -> err.flatMap(throwable -> {
L.d(Thread.currentThread() + " Error!");
if (throwable instanceof NetworkException) {
PublishSubject<Integer> choice = PublishSubject.create();
activity.runOnUiThread(() -> {
MaterialDialog dialog = retryDialog(activity);
View retry = dialog.getActionButton(DialogAction.POSITIVE);
View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
RxView.clicks(retry).subscribe(o -> {
dialog.dismiss();
choice.onNext(1);
});
RxView.clicks(cancel).subscribe(o -> {
dialog.dismiss();
choice.onError(throwable);
});
dialog.show();
});
return choice;
} else {
return Observable.error(throwable);
}
}));
}
我有一个 observable,它可能会因特殊异常而失败,在这种情况下,我想显示一个带有重试按钮的对话框。我已经看到 retryWhen
来解决我的问题,所以我使用了 onErrorResumeNext
。如果你能想出一种方法来对retryWhen
做同样的事情,请告诉。
现在我有这段代码:
public Observable<Order> proceedWithOrdering(Activity activity) {
return apiService.createOrder()
.subscribeOn(Schedulers.io())
.compose(applyRetryLogic(activity))
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread());
}
public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
return observable -> observable
.onErrorResumeNext(retry(observable, activity))
.subscribeOn(AndroidSchedulers.mainThread());
}
public <T> Func1<Throwable, ? extends Observable<? extends T>> retry(Observable toRetry, Activity activity) {
return throwable -> {
if (throwable instanceof NetworkException) {
MaterialDialog dialog = retryDialog(activity);
View retry = dialog.getActionButton(DialogAction.POSITIVE);
View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
Observable<Object> retryClick = RxView.clicks(retry).map(o -> {
dialog.dismiss();
return o;
});
Observable<Object> cancelClick = RxView.clicks(cancel).flatMap(o -> {
dialog.dismiss();
return Observable.error(throwable);
});
dialog.show();
return Observable.amb(retryClick, cancelClick)
.flatMap(o -> toRetry.compose(applyRetryLogic(activity)));
} else {
return Observable.error(throwable);
}
};
}
问题是 retry
中的 call
没有在主线程上执行,它引发了 Can't create handler inside thread that has not called Looper.prepare()
异常。
问题是 - 如何强制它在主线程上执行?如您所见,我已经尝试在 compose
和 onErrorResumeNext
之后立即执行 subscribeOn(AndroidSchedulers.mainThread())
但没有成功。
我已经使用不在单独线程上运行的简单可观察对象测试了我的代码,并且运行良好。
您可以通过 flatMap
ping a PublishSubject
来完成此操作,然后在按下相关按钮后更新它。这是一个经典的 Java Swing 示例。
public class RetryWhenEnter {
public static void main(String[] args) {
AtomicInteger d = new AtomicInteger();
Observable<Integer> source = Observable.just(1);
source.flatMap(v -> {
if (d.incrementAndGet() < 3) {
return Observable.error(new RuntimeException());
}
return Observable.just(v);
})
.retryWhen(err -> {
return err.flatMap(e -> {
System.out.println(Thread.currentThread() + " Error!");
PublishSubject<Integer> choice = PublishSubject.create();
SwingUtilities.invokeLater(() -> {
int c = JOptionPane.showConfirmDialog(null,
e.toString() + "\r\nRetry?", "Error",
JOptionPane.YES_NO_OPTION);
if (c == JOptionPane.YES_OPTION) {
choice.onNext(1);
} else {
choice.onCompleted();
}
});
return choice;
});
}).subscribe(System.out::println,
Throwable::printStackTrace);
}
}
编辑:
或在 onErrorResumeNext
之前或使用 retryWhen
时使用 observeOn(AndroidSchedulers.mainThread())
:retryWhen(o -> o.observeOn(AndroidSchedulers.mainThread())...)
。
编辑 2 我回滚了更改,因此答案再次有意义。
有一种方法可以使用 retryWhen
解决我的问题(感谢@akarnokd):
public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
return observable -> observable
.retryWhen(err -> err.flatMap(throwable -> {
L.d(Thread.currentThread() + " Error!");
if (throwable instanceof NetworkException) {
PublishSubject<Integer> choice = PublishSubject.create();
activity.runOnUiThread(() -> {
MaterialDialog dialog = retryDialog(activity);
View retry = dialog.getActionButton(DialogAction.POSITIVE);
View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
RxView.clicks(retry).subscribe(o -> {
dialog.dismiss();
choice.onNext(1);
});
RxView.clicks(cancel).subscribe(o -> {
dialog.dismiss();
choice.onError(throwable);
});
dialog.show();
});
return choice;
} else {
return Observable.error(throwable);
}
}));
}