在 Rxjava 中创建线程
Creating threads in Rxjava
我在 just()
运算符中传递两个参数。我的代码片段是:
Observable<Integer> observable = Observable.just(1,2);
observable.subscribeOn(Schedulers.newThread())
.subscribe(
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
@Override
public void onNext(Integer e) {
System.out.println(e);
//request web service
});
我观察到它没有为每个发出的项目创建单独的线程。作为 just
个参数出现的项目按顺序 运行。如何为每个发出的项目创建单独的线程?
您可以使用 flatMap 并在 flatMap 内部创建新的可观察对象并使用 subscribeOn
@Test
public void test() {
Observable.just(1, 2)
.flatMap(item -> Observable.just(item)
.subscribeOn(Schedulers.newThread())
.doOnNext(i -> System.out.println("Thread:" + Thread.currentThread())))
.subscribe(System.out::println);
}
您可以在此处查看更多关于 async observable 的示例 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java
我在 just()
运算符中传递两个参数。我的代码片段是:
Observable<Integer> observable = Observable.just(1,2);
observable.subscribeOn(Schedulers.newThread())
.subscribe(
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
@Override
public void onNext(Integer e) {
System.out.println(e);
//request web service
});
我观察到它没有为每个发出的项目创建单独的线程。作为 just
个参数出现的项目按顺序 运行。如何为每个发出的项目创建单独的线程?
您可以使用 flatMap 并在 flatMap 内部创建新的可观察对象并使用 subscribeOn
@Test
public void test() {
Observable.just(1, 2)
.flatMap(item -> Observable.just(item)
.subscribeOn(Schedulers.newThread())
.doOnNext(i -> System.out.println("Thread:" + Thread.currentThread())))
.subscribe(System.out::println);
}
您可以在此处查看更多关于 async observable 的示例 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java