为什么在我的可观察序列中需要第二次 onSubscribe() 调用?

Why is the second onSubscribe() call required in my observable sequence?

需要在我的可观察序列中第二次应用 onSubscribe() 运算符,请参见行:details.add(myApi.getDetails(h.getId()).subscribeOn(Schedulers.io()));。如果未应用 onSubscribe() 运算符,则会抛出 NetworkOnMainThreadException

我的理解是,由于我已经在序列的早期应用了 subscribeOn(Schedulers.io()) 运算符,因此所有未来的订阅都应该在 io 调度程序上进行。我的理解有什么问题?这是否可能是改造 beta2 问题,因为在下面的示例中 myApi 实例是通过改造创建的?

myApi.getHeadlines()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(new Func1<Headlines, Observable<HeadlineDetail> {
        @Override
        public Observable<HeadlineDetail> call(Headlines headlines) {
            List<Observable<HeadlineDetail>> details = new ArrayList<>();
            for (Headline h : headlines) {
                  details.add(myApi.getDetails(h.getId()).subscribeOn(Schedulers.io()));
            }
            return Observable.merge(details);
        }
    })
    .subscribe(...);

依赖关系:

subscribeOn 设置 observable 启动的线程,但 observeOn 影响用于下游操作的线程。它们是 "observing" 原始可观察值。您从后台线程开始,但将所有内容切换到主线程。尝试将您的 observeOn 移到链中的后面。

有关详细信息,请参阅 observeOn 上的文档。

myApi.getHeadlines()
    .subscribeOn(Schedulers.io())
    .flatMap(new Func1<Headlines, Observable<HeadlineDetail> {
        @Override
        public Observable<HeadlineDetail> call(Headlines headlines) {
            List<Observable<HeadlineDetail>> details = new ArrayList<>();
            for (Headline h : headlines) {
                  details.add(myApi.getDetails(h.getId()));
            }
            return Observable.merge(details);
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...);