为什么我的 RxJava observable 没有触发订阅者?
Why is my RxJava observable not firing off subscribers?
我正在研究 RxJava,我想流式传输一千个连续的整数。然后我想把它们异步拆分成奇数流和偶数流,然后异步打印出来。
但是,我什么也没有打印出来,或者至少只有非常部分的输出。我错过了什么?我的时间安排有误吗?还是控制台在 Eclipse 中存在多线程问题?
public static void main(String[] args) {
List<Integer> values = IntStream.range(0,1000).mapToObj(i -> Integer.valueOf(i)).collect(Collectors.toList());
Observable<Integer> ints = Observable.from(values).subscribeOn(Schedulers.computation());
Observable<Integer> evens = ints.filter(i -> Math.abs(i) % 2 == 0);
Observable<Integer> odds = ints.filter(i -> Math.abs(i) % 2 != 0);
evens.subscribe(i -> System.out.println(i + " IS EVEN " + Thread.currentThread().getName()));
odds.subscribe(i -> System.out.println(i + " IS ODD " + Thread.currentThread().getName()));
}
您正在使用 Schedules.computation
的 运行 守护进程线程启动处理管道。因此,当您的 main
线程完成时,这些线程会在处理您的可观察对象之前终止。
因此,如果您希望看到打印的结果,您可以让主线程等待结果(例如 Thread.sleep
)或通过删除 subscribeOn
订阅调用线程。还有一个选项可以创建您自己的调度程序,它将 运行 非守护线程。
我正在研究 RxJava,我想流式传输一千个连续的整数。然后我想把它们异步拆分成奇数流和偶数流,然后异步打印出来。
但是,我什么也没有打印出来,或者至少只有非常部分的输出。我错过了什么?我的时间安排有误吗?还是控制台在 Eclipse 中存在多线程问题?
public static void main(String[] args) {
List<Integer> values = IntStream.range(0,1000).mapToObj(i -> Integer.valueOf(i)).collect(Collectors.toList());
Observable<Integer> ints = Observable.from(values).subscribeOn(Schedulers.computation());
Observable<Integer> evens = ints.filter(i -> Math.abs(i) % 2 == 0);
Observable<Integer> odds = ints.filter(i -> Math.abs(i) % 2 != 0);
evens.subscribe(i -> System.out.println(i + " IS EVEN " + Thread.currentThread().getName()));
odds.subscribe(i -> System.out.println(i + " IS ODD " + Thread.currentThread().getName()));
}
您正在使用 Schedules.computation
的 运行 守护进程线程启动处理管道。因此,当您的 main
线程完成时,这些线程会在处理您的可观察对象之前终止。
因此,如果您希望看到打印的结果,您可以让主线程等待结果(例如 Thread.sleep
)或通过删除 subscribeOn
订阅调用线程。还有一个选项可以创建您自己的调度程序,它将 运行 非守护线程。