RxJava onErrorResumeNext 调用 java.io.InterrupedIOException
RxJava onErrorResumeNext calls java.io.InterrupedIOException
我对 RxJava onErrorResumeNext 运算符有疑问。
我想获取位置,然后根据位置从服务器获取数据(使用 Retrofit),但是如果没有位置(错误:序列不包含元素),我想使用另一个 Observable 从服务器获取数据(不依赖于位置) ).我尝试使用 onErrorResumeNext 运算符,但得到 "java.io.InterruptedIOException: thread interrupted".
添加 onErrorResumeNext 之前的代码 - 效果很好
LocationService.getUpdatedOrLastKnownLocation(getContext()))
.flatMap(location -> RestService.getPostsAround(location,0,10)) //offset = 0, limit = 10;
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
带有 onErrorResumeNext 的代码 - 抛出异常
LocationService.getUpdatedOrLastKnownLocation(getContext()))
.flatMap(location -> RestService.getPostsAround(location,0,10)) //offset = 0, limit = 10;
.onErrorResumeNext(RestService.getPostsByMapProjection(googleMap.getProjection().getVisibleRegion()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
堆栈跟踪:
07-27 22:33:58.384 18632-18632/com.blacksea.plamobi W/System.err: java.io.InterruptedIOException: thread interrupted
07-27 22:33:58.384 18632-18632/com.blacksea.plamobi W/System.err: at okio.Timeout.throwIfReached(Timeout.java:145)
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err: at okio.Okio.write(Okio.java:77)
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err: at okio.AsyncTimeout.write(AsyncTimeout.java:155)
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err: at okio.RealBufferedSink.flush(RealBufferedSink.java:221)
07-27 22:33:58.386 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.Http1xStream.finishRequest(Http1xStream.java:159)
07-27 22:33:58.386 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.HttpEngine.readNetworkResponse(HttpEngine.java:721)
07-27 22:33:58.387 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.HttpEngine.access0(HttpEngine.java:81)
07-27 22:33:58.387 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.HttpEngine$NetworkInterceptorChain.proceed(HttpEngine.java:708)
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.HttpEngine.readResponse(HttpEngine.java:563)
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.RealCall.getResponse(RealCall.java:241)
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:198)
07-27 22:33:58.391 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:160)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.RealCall.execute(RealCall.java:57)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err: at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err: at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request(RxJavaCallAdapterFactory.java:171)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err: at rx.Subscriber.setProducer(Subscriber.java:211)
07-27 22:33:58.393 18632-18632/com.blacksea.plamobi W/System.err: at rx.Subscriber.setProducer(Subscriber.java:205)
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err: at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:152)
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err: at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:138)
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
07-27 22:33:58.395 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
07-27 22:33:58.395 18632-18632/com.blacksea.plamobi W/System.err: at rx.Observable.unsafeSubscribe(Observable.java:8460)
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OnSubscribeFlattenIterable.call(OnSubscribeFlattenIterable.java:65)
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OnSubscribeFlattenIterable.call(OnSubscribeFlattenIterable.java:37)
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err: at rx.Observable.unsafeSubscribe(Observable.java:8460)
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorOnErrorResumeNextViaFunction.onError(OperatorOnErrorResumeNextViaFunction.java:141)
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266)
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:810)
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:571)
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:560)
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:276)
07-27 22:33:58.401 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMap$MapSubscriber.onError(OperatorMap.java:85)
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266)
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:810)
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:571)
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:560)
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:844)
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorSubscribeOn.onError(OperatorSubscribeOn.java:59)
07-27 22:33:58.405 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:264)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:207)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:423)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.FutureTask.run(FutureTask.java:237)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err: at java.lang.Thread.run(Thread.java:818)
根据我的经验,java.io.InterruptedIOException
是一个转移注意力的问题,因为它通常意味着某些 其他 线程未订阅,而任何线程 运行 (f.e。网络请求)被中断。
在您的情况下,无论是否有错误,每次都会调用 RestService.getPostsByMapProjection
,这可能不是您想要的行为;考虑将其包装在 Observable.defer()
中
您可以在 flatMap 上使用最大并发性以确保您没有任何并发问题
@Beta
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func), maxConcurrent);
}
在你的代码中
LocationService.getUpdatedOrLastKnownLocation(getContext()))
.flatMap(location -> RestService.getPostsAround(location,0,10),1) //offset = 0, limit = 10;
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如果使用 retrofit2,它的 observables 不会改变它们操作的线程,所以直到 subscribeOn() 的所有链都在 io 调度程序的单个线程上操作。显然来自 RestService.getPostsAround
的错误为该线程设置了中断标志,而 okio 在 RestService.getPostsByMapProjection
中阻塞了它。您可以尝试在 onErrorResumeNext()
.
中的 RestService.getPostsByMapProjection
之后添加 subscribeOn(Schedulers.io())
我对 RxJava onErrorResumeNext 运算符有疑问。 我想获取位置,然后根据位置从服务器获取数据(使用 Retrofit),但是如果没有位置(错误:序列不包含元素),我想使用另一个 Observable 从服务器获取数据(不依赖于位置) ).我尝试使用 onErrorResumeNext 运算符,但得到 "java.io.InterruptedIOException: thread interrupted".
添加 onErrorResumeNext 之前的代码 - 效果很好
LocationService.getUpdatedOrLastKnownLocation(getContext()))
.flatMap(location -> RestService.getPostsAround(location,0,10)) //offset = 0, limit = 10;
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
带有 onErrorResumeNext 的代码 - 抛出异常
LocationService.getUpdatedOrLastKnownLocation(getContext()))
.flatMap(location -> RestService.getPostsAround(location,0,10)) //offset = 0, limit = 10;
.onErrorResumeNext(RestService.getPostsByMapProjection(googleMap.getProjection().getVisibleRegion()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
堆栈跟踪:
07-27 22:33:58.384 18632-18632/com.blacksea.plamobi W/System.err: java.io.InterruptedIOException: thread interrupted
07-27 22:33:58.384 18632-18632/com.blacksea.plamobi W/System.err: at okio.Timeout.throwIfReached(Timeout.java:145)
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err: at okio.Okio.write(Okio.java:77)
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err: at okio.AsyncTimeout.write(AsyncTimeout.java:155)
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err: at okio.RealBufferedSink.flush(RealBufferedSink.java:221)
07-27 22:33:58.386 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.Http1xStream.finishRequest(Http1xStream.java:159)
07-27 22:33:58.386 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.HttpEngine.readNetworkResponse(HttpEngine.java:721)
07-27 22:33:58.387 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.HttpEngine.access0(HttpEngine.java:81)
07-27 22:33:58.387 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.HttpEngine$NetworkInterceptorChain.proceed(HttpEngine.java:708)
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.internal.http.HttpEngine.readResponse(HttpEngine.java:563)
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.RealCall.getResponse(RealCall.java:241)
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:198)
07-27 22:33:58.391 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:160)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err: at okhttp3.RealCall.execute(RealCall.java:57)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err: at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err: at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request(RxJavaCallAdapterFactory.java:171)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err: at rx.Subscriber.setProducer(Subscriber.java:211)
07-27 22:33:58.393 18632-18632/com.blacksea.plamobi W/System.err: at rx.Subscriber.setProducer(Subscriber.java:205)
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err: at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:152)
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err: at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:138)
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
07-27 22:33:58.395 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
07-27 22:33:58.395 18632-18632/com.blacksea.plamobi W/System.err: at rx.Observable.unsafeSubscribe(Observable.java:8460)
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OnSubscribeFlattenIterable.call(OnSubscribeFlattenIterable.java:65)
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OnSubscribeFlattenIterable.call(OnSubscribeFlattenIterable.java:37)
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err: at rx.Observable.unsafeSubscribe(Observable.java:8460)
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorOnErrorResumeNextViaFunction.onError(OperatorOnErrorResumeNextViaFunction.java:141)
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266)
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:810)
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:571)
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:560)
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:276)
07-27 22:33:58.401 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMap$MapSubscriber.onError(OperatorMap.java:85)
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266)
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:810)
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:571)
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:560)
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:844)
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorSubscribeOn.onError(OperatorSubscribeOn.java:59)
07-27 22:33:58.405 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:264)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:207)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err: at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:423)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.FutureTask.run(FutureTask.java:237)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err: at java.lang.Thread.run(Thread.java:818)
根据我的经验,java.io.InterruptedIOException
是一个转移注意力的问题,因为它通常意味着某些 其他 线程未订阅,而任何线程 运行 (f.e。网络请求)被中断。
在您的情况下,无论是否有错误,每次都会调用 RestService.getPostsByMapProjection
,这可能不是您想要的行为;考虑将其包装在 Observable.defer()
您可以在 flatMap 上使用最大并发性以确保您没有任何并发问题
@Beta
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func), maxConcurrent);
}
在你的代码中
LocationService.getUpdatedOrLastKnownLocation(getContext()))
.flatMap(location -> RestService.getPostsAround(location,0,10),1) //offset = 0, limit = 10;
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如果使用 retrofit2,它的 observables 不会改变它们操作的线程,所以直到 subscribeOn() 的所有链都在 io 调度程序的单个线程上操作。显然来自 RestService.getPostsAround
的错误为该线程设置了中断标志,而 okio 在 RestService.getPostsByMapProjection
中阻塞了它。您可以尝试在 onErrorResumeNext()
.
RestService.getPostsByMapProjection
之后添加 subscribeOn(Schedulers.io())