RxJava 在 10 秒后发送 onError()

RxJava send onError() after 10secs

我将 Android 应用程序中的不同操作与 RxJava 相结合,我希望流程成功完成并在 onNext() 中提交项目,或者在 10 秒后,onError 会被抛出。

我用 timeout 尝试过这样的事情:

Observable.from(list)
            .doOnNext(new Action1<List<String>>() {
                @Override
                public void call(List<String> list) {
                    //do something here
                }
            })
            .filter(new Func1<List<String>>, Boolean>() {
                @Override
                public Boolean call(List<String> list) {
                    return list != null;
                }
            })
            .flatMap(new Func1<List<String>>, Observable<MyResponse>>() {
                @Override
                public Observable<MyResponse> call(List<String> list) {
                    //flatmap something here
                    return Observable.just(new MyResponse(list));
                }

            })
            .flatMap(new Func1<MyResponse, Observable<AnotherResponse>>() {
                @Override
                public Observable<AnotherResponse> call(MyResponse myResponse) {
                    //do something here
                    return Observable.just(new AnotherResponse(myResponse));
                }
            })
            .timeout(10, TimeUnit.SECONDS)
            .subscribe(new Subscriber<AnotherResponse>()) {
                //do Subscription stuff here
            });

但这无论如何都会引发超时,如果上面列出的流程在 10 秒内未成功完成,我只想跳转到 onError。有什么建议可以实现吗?

正如克里斯托弗在他的评论中所说,你得到错误的原因是 timeout 将抛出一个 TimeoutException 每当一个未完成的 Observable(onCompleted 尚未被调用) 未能在设定的超时时间内生成下一个 onNext

因为我不确定你的源 Observable - 或者更确切地说,flatMaps 中的 Observables - 在做什么,我会首先检查它是否应该实际产生一个 onCompleted (可能在 at至少一个 onNext) 或它是否保持打开状态 "by design"(可能是源是开放式流,例如您设备的网络状态)。如果源代码本身是开放式的,您可以在第一个 onNext 之后人为地引入一个 onCompleted,只需将 take(1) 添加到您的链中即可。

也许这对你有帮助。

private Observable<String> myMethod(final List<String> list) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override public void call(final Subscriber<? super String> subscriber) {

            // Your long lasting operation
            Observable<String> listObservable = Observable.from(list);

            // This is just my way to make slow operation (I have 10 items in my list)
            Observable<String> delayedObservable = listObservable.zipWith(Observable.interval(2, TimeUnit.SECONDS), new Func2<String,
                    Long,
                    String>() {
                @Override public String call(String s, Long aLong) {
                    return s;
                }
            });
            delayedObservable.subscribe(subscriber);

            Runnable r = new Runnable() {
                @Override public void run() {
                    // This thread makes the timeout, it is up to you if you would keep this like this.
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    subscriber.onError(new TimeoutException());
                    subscriber.onCompleted();
                }
            };
            Thread thread = new Thread(r);
            thread.start();
        }
    });
}

这对我有用,在非常基本的情况下,我希望它能帮助你。