RxJava 在 10 秒后发送 onError()
RxJava send onError() after 10secs
我将 Android
应用程序中的不同操作与 RxJava
相结合,我希望流程成功完成并在 onNext()
中提交项目,或者在 10 秒后,onError
会被抛出。
我用 timeout
尝试过这样的事情:
Observable.from(list)
.doOnNext(new Action1<List<String>>() {
@Override
public void call(List<String> list) {
//do something here
}
})
.filter(new Func1<List<String>>, Boolean>() {
@Override
public Boolean call(List<String> list) {
return list != null;
}
})
.flatMap(new Func1<List<String>>, Observable<MyResponse>>() {
@Override
public Observable<MyResponse> call(List<String> list) {
//flatmap something here
return Observable.just(new MyResponse(list));
}
})
.flatMap(new Func1<MyResponse, Observable<AnotherResponse>>() {
@Override
public Observable<AnotherResponse> call(MyResponse myResponse) {
//do something here
return Observable.just(new AnotherResponse(myResponse));
}
})
.timeout(10, TimeUnit.SECONDS)
.subscribe(new Subscriber<AnotherResponse>()) {
//do Subscription stuff here
});
但这无论如何都会引发超时,如果上面列出的流程在 10 秒内未成功完成,我只想跳转到 onError
。有什么建议可以实现吗?
正如克里斯托弗在他的评论中所说,你得到错误的原因是 timeout
将抛出一个 TimeoutException
每当一个未完成的 Observable(onCompleted
尚未被调用) 未能在设定的超时时间内生成下一个 onNext
。
因为我不确定你的源 Observable - 或者更确切地说,flatMap
s 中的 Observables - 在做什么,我会首先检查它是否应该实际产生一个 onCompleted
(可能在 at至少一个 onNext
) 或它是否保持打开状态 "by design"(可能是源是开放式流,例如您设备的网络状态)。如果源代码本身是开放式的,您可以在第一个 onNext
之后人为地引入一个 onCompleted
,只需将 take(1)
添加到您的链中即可。
也许这对你有帮助。
private Observable<String> myMethod(final List<String> list) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(final Subscriber<? super String> subscriber) {
// Your long lasting operation
Observable<String> listObservable = Observable.from(list);
// This is just my way to make slow operation (I have 10 items in my list)
Observable<String> delayedObservable = listObservable.zipWith(Observable.interval(2, TimeUnit.SECONDS), new Func2<String,
Long,
String>() {
@Override public String call(String s, Long aLong) {
return s;
}
});
delayedObservable.subscribe(subscriber);
Runnable r = new Runnable() {
@Override public void run() {
// This thread makes the timeout, it is up to you if you would keep this like this.
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onError(new TimeoutException());
subscriber.onCompleted();
}
};
Thread thread = new Thread(r);
thread.start();
}
});
}
这对我有用,在非常基本的情况下,我希望它能帮助你。
我将 Android
应用程序中的不同操作与 RxJava
相结合,我希望流程成功完成并在 onNext()
中提交项目,或者在 10 秒后,onError
会被抛出。
我用 timeout
尝试过这样的事情:
Observable.from(list)
.doOnNext(new Action1<List<String>>() {
@Override
public void call(List<String> list) {
//do something here
}
})
.filter(new Func1<List<String>>, Boolean>() {
@Override
public Boolean call(List<String> list) {
return list != null;
}
})
.flatMap(new Func1<List<String>>, Observable<MyResponse>>() {
@Override
public Observable<MyResponse> call(List<String> list) {
//flatmap something here
return Observable.just(new MyResponse(list));
}
})
.flatMap(new Func1<MyResponse, Observable<AnotherResponse>>() {
@Override
public Observable<AnotherResponse> call(MyResponse myResponse) {
//do something here
return Observable.just(new AnotherResponse(myResponse));
}
})
.timeout(10, TimeUnit.SECONDS)
.subscribe(new Subscriber<AnotherResponse>()) {
//do Subscription stuff here
});
但这无论如何都会引发超时,如果上面列出的流程在 10 秒内未成功完成,我只想跳转到 onError
。有什么建议可以实现吗?
正如克里斯托弗在他的评论中所说,你得到错误的原因是 timeout
将抛出一个 TimeoutException
每当一个未完成的 Observable(onCompleted
尚未被调用) 未能在设定的超时时间内生成下一个 onNext
。
因为我不确定你的源 Observable - 或者更确切地说,flatMap
s 中的 Observables - 在做什么,我会首先检查它是否应该实际产生一个 onCompleted
(可能在 at至少一个 onNext
) 或它是否保持打开状态 "by design"(可能是源是开放式流,例如您设备的网络状态)。如果源代码本身是开放式的,您可以在第一个 onNext
之后人为地引入一个 onCompleted
,只需将 take(1)
添加到您的链中即可。
也许这对你有帮助。
private Observable<String> myMethod(final List<String> list) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(final Subscriber<? super String> subscriber) {
// Your long lasting operation
Observable<String> listObservable = Observable.from(list);
// This is just my way to make slow operation (I have 10 items in my list)
Observable<String> delayedObservable = listObservable.zipWith(Observable.interval(2, TimeUnit.SECONDS), new Func2<String,
Long,
String>() {
@Override public String call(String s, Long aLong) {
return s;
}
});
delayedObservable.subscribe(subscriber);
Runnable r = new Runnable() {
@Override public void run() {
// This thread makes the timeout, it is up to you if you would keep this like this.
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onError(new TimeoutException());
subscriber.onCompleted();
}
};
Thread thread = new Thread(r);
thread.start();
}
});
}
这对我有用,在非常基本的情况下,我希望它能帮助你。