onNext 和 onComplete 的行为

Behaviour of onNext and onComplete

我有一个 Observable 可以做一些事情而不需要发出一个值。我还有一个我希望 Observable 使用的对象列表。所以对于这个列表中的所有元素:doSomething()

Observable.from(uris)
        .flatMap(new Func1<Uri, Observable<Void>>() {
            @Override
            public Observable<Void> call(Uri uri) {
                return createDoSomethingObservable(uri);
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(new Observer<Void>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "completed");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Void aVoid) {
                Log.d(TAG, "next");
            }
        });

以及创建 Observable 的方法:

Observable<Void> createDoSomethingObservable(final Uri uri) {
    return Observable.create(new Observable.OnSubscribe<Void>() {
        @Override
        public void call(Subscriber<? super Void> subscriber) {
            //doSomething
            subscriber.onNext(null);
            subscriber.onCompleted();
        }
    });
}

现在,当我 运行 使用包含 3 个元素的列表时,我得到:

next
next
next
completed

这很好,因为那是我想要的,但我不知道它为什么起作用。首先,我开始只调用 onComplete,因为最终 observable 完成了它的工作并完成了。但是当然永远不会在订户上调用 onNext 。反之亦然。

所以我的问题是:

  1. 为什么只对最后一个列表元素调用 onComplete?
  2. 有没有更好的方法解决这个问题?

onComplete(同onError)在observable链中只调用一次,rxjava就是这样实现的

我认为你的方法是正确的,所以不需要更好的方法。

onComplete 为最后一个元素调用,因为那是链中最早的可观察对象 (from(uris)) 完成的时间。

预计从 flatMap 发出的 observable 将调用 onComplete。完成后(并且 call 已返回),然后可以处理 from 的下一次发射。一旦 from 完成发出 observables,它会调用 onComplete 并且链有效地完成。

我认为,这段小代码可以帮助您理解 onNext( )onComplete() 的行为。
假设,你有一个 List<Uri>。让我们手动将其转换为 Observable<Uri> .

public static Observable<Uri> getUries(List<Uri> uriList){
    return Observable.create(new Observable.OnSubscribe<Uri>() {
        @Override
        public void call(Subscriber<? super Uri> subscriber) {
            for(Uri uri : uriList){
                subscriber.onNext(uri);
            }

            subscriber.onCompleted();
        }
    });
}

或使用 Lambda 表达式:

public static Observable<Uri> getUries(List<Uri> uriList){
    return Observable.create(subscriber -> {
        for(Uri uri : uriList){
            subscriber.onNext(uri);
        }

        subscriber.onCompleted();
    });
}

如您所见,我们正在迭代输入列表,并为每个元素调用 onNext( ),当我们完成将 List 转换为 Observable 时,我们调用 onComplete()

P.S. 此代码只是一个演示,请不要使用它List转换为Observable。使用运算符Observable.from()

更新:

运算符from( )实现:

...
while (true) {
    if (o.isUnsubscribed()) {
        return;
    } else if (it.hasNext()) {
        o.onNext(it.next());
    } else if (!o.isUnsubscribed()) {
        o.onCompleted();
        return;
    } else {
        // is unsubscribed
        return;
    }
}
...

link:https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java#L75-L87