为什么这总是在主线程上运行?
Why this always runs on main thread?
我试图实现一个非常简单的例子RxAndroid2
。当我尝试 运行 这段代码时,它让我感到困惑。
ArrayList<Integer> arr = new ArrayList<>();
arr.add(0);
arr.add(1);
arr.add(2);
arr.add(3);
arr.add(4);
arr.add(5);
arr.add(6);
Observable.fromIterable(arr)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
@Override
public void onNext(Integer integer) {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
@Override
public void onComplete() {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
});
并且日志总是告诉我它 运行 仅在 main
线程上。
对此有什么想法吗?如果您能向我解释更多有关 observeOn() 和 subscribeOn() 的信息,那就太好了。
已编辑:这段代码仍然给我所有主线程。
ArrayList<Integer> arr = new ArrayList<>();
arr.add(0);
arr.add(1);
arr.add(2);
arr.add(3);
arr.add(4);
arr.add(5);
arr.add(6);
Observable.fromIterable(arr)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
})
.subscribe(integer -> {
LogUtil.logD(TAG, Thread.currentThread().getName());
});
你有observeOn(AndroidSchedulers.mainThread())
observeOn()
的 javadoc 明确指出:
Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer with Flowable.bufferSize() "island size".
那么为什么您对主线程上发生的事情感到惊讶?!
并且要清楚这一点:您的代码将 Observable 配置为使用 主线程。
进一步研究相关的 API,我 猜测 您必须在您的调度程序上调用 start(),例如:
Scheduler newThreadScheduler = Schedulers.newThread();
newThreadScheduler.start();
Observable.fromIterable(arr)
.subscribeOn(newThreadScheduler)
.observeOn(newThreadScheduler)
您可以移动运算符:
Observable.fromIterable(arr)
.subscribeOn(Schedulers.newThread())
// <------------------------
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
})
.observeOn(AndroidSchedulers.mainThread()) // <--------------------------
.subscribe(integer -> {
LogUtil.logD(TAG, Thread.currentThread().getName());
});
这不再在主线程上执行 doOnSubscibe
和 doOnNext
中的代码,只有 subscribe
处理程序在主线程上执行。
我试图实现一个非常简单的例子RxAndroid2
。当我尝试 运行 这段代码时,它让我感到困惑。
ArrayList<Integer> arr = new ArrayList<>();
arr.add(0);
arr.add(1);
arr.add(2);
arr.add(3);
arr.add(4);
arr.add(5);
arr.add(6);
Observable.fromIterable(arr)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
@Override
public void onNext(Integer integer) {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
@Override
public void onComplete() {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
});
并且日志总是告诉我它 运行 仅在 main
线程上。
对此有什么想法吗?如果您能向我解释更多有关 observeOn() 和 subscribeOn() 的信息,那就太好了。
已编辑:这段代码仍然给我所有主线程。
ArrayList<Integer> arr = new ArrayList<>();
arr.add(0);
arr.add(1);
arr.add(2);
arr.add(3);
arr.add(4);
arr.add(5);
arr.add(6);
Observable.fromIterable(arr)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
})
.subscribe(integer -> {
LogUtil.logD(TAG, Thread.currentThread().getName());
});
你有observeOn(AndroidSchedulers.mainThread())
observeOn()
的 javadoc 明确指出:
Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer with Flowable.bufferSize() "island size".
那么为什么您对主线程上发生的事情感到惊讶?!
并且要清楚这一点:您的代码将 Observable 配置为使用 主线程。
进一步研究相关的 API,我 猜测 您必须在您的调度程序上调用 start(),例如:
Scheduler newThreadScheduler = Schedulers.newThread();
newThreadScheduler.start();
Observable.fromIterable(arr)
.subscribeOn(newThreadScheduler)
.observeOn(newThreadScheduler)
您可以移动运算符:
Observable.fromIterable(arr)
.subscribeOn(Schedulers.newThread())
// <------------------------
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtil.logD(TAG, Thread.currentThread().getName());
}
})
.observeOn(AndroidSchedulers.mainThread()) // <--------------------------
.subscribe(integer -> {
LogUtil.logD(TAG, Thread.currentThread().getName());
});
这不再在主线程上执行 doOnSubscibe
和 doOnNext
中的代码,只有 subscribe
处理程序在主线程上执行。