RxJava 2 在新线程中创建列表

RxJava 2 creating list in new Thread

开发者! 我正在尝试在实际项目中使用 RxJava,但似乎我没有理解正确的逻辑。我需要在新线程中创建一个对象列表。完成后将此列表发送给观察员。我发现的是:

LinkedList<IntroSliderElement> list = new LinkedList<>();
    list.add(new IntroSliderElement(0, "test 0", 0));
    list.add(new IntroSliderElement(1, "test 1", 1));
    list.add(new IntroSliderElement(2, "test 2", 2));

    Observable<LinkedList<IntroSliderElement>> listObserv = Observable.just(list);
    listObserv
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<List<IntroSliderElement>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(List<IntroSliderElement> value) {
                    view().render(new IntroModel.OnFirstSliderElement((LinkedList<IntroSliderElement>) value));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

但是很容易看出,list是在主线程中创建和执行的,那么如何使用rxJava在一个全新的线程中创建呢?

想必你想要的是Observable.fromCallable()

Observable.fromCallable(() -> {
    // init your list here
    yourList = ....
    Observable.fromIterable(yourList);
});

订阅发生时会执行内部代码。 因此,您可以在您喜欢的线程上执行订阅。

Observable.create() 无论订阅是否发生,都会立即执行,因此建议谨慎使用。

正如 Artem Zinnatullin 在 post 中所述:

Don't use Observable.create() if you can, it's very easy to shoot yourself in the foot! (and then shoot again for each new subscriber!)

这里的问题是 just 运算符创建了一个可观察对象,只是 发出了最后一项。它对创作本身没有影响。您需要在 OnSubscribe 回调中创建此列表,并在准备就绪后发出。然后您可以使用 subscribeOn 运算符指定此函数将在哪个调度程序上 运行。

Observable.create(emitter -> {
    LinkedList<IntroSliderElement> list = new LinkedList<>();
    list.add(new IntroSliderElement(0, "test 0", 0));
    list.add(new IntroSliderElement(1, "test 1", 1));
    list.add(new IntroSliderElement(2, "test 2", 2));

    emitter.onNext(list);
    emitter.onComplete();

})
.subscribeOn(Schedulers.newThread())
.observeOn(/* You should choose the thread on which the result will be processed */)
.subscribe(result -> {
    // process the result here
})