rx subscribeOn 不适用于 Subject?

rx subscribeOn didn't work with Subject?

代码如下:

Observable<Integer> observable = Observable.just(1, 2);
observable.doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
          .subscribeOn(Schedulers.computation())
          .doOnNext(i -> System.out.println("Emitting " + i + " after subscribeOn" + " on thread " + Thread.currentThread().getName()))
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(i -> System.out.println( "Receiving " + i + " on thread " + Thread.currentThread().getName()));

它给了我这个输出:

I/System.out: Emitting 1 on thread RxComputationScheduler-1
I/System.out: Emitting 1 after subscribeOn on thread RxComputationScheduler-1
I/System.out: Emitting 2 on thread RxComputationScheduler-1
I/System.out: Emitting 2 after subscribeOn on thread RxComputationScheduler-1
I/System.out: Receiving 1 on thread main
I/System.out: Receiving 2 on thread main

一切都好。

现在我想对 Subject 做同样的事情,我将代码更改如下:

PublishSubject<Integer> subject = PublishSubject.create();

subject.doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
       .subscribeOn(Schedulers.computation())
       .doOnNext(i -> System.out.println("Emitting " + i + " after subscribeOn" + " on thread " + Thread.currentThread().getName()))
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(i -> System.out.println( "Receiving " + i + " on thread " + Thread.currentThread().getName()));

subject.onNext(1);

在这种情况下,我没有得到任何输出。为什么这样?以及如何在给定的调度程序中强制 Subject 发出流?

决定是:

PublishSubject<Integer> subject = PublishSubject.create();
subject
      **.observeOn(Schedulers.computation())**
      .doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))                    
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(i -> System.out.println( "Receiving " + i + " on thread " + Thread.currentThread().getName()));

subject.onNext(1);

或者您可以从 Schedulers.computation() 线程

调用 subject.onNext(1)