主线程中的 SubscribeOn 和 observeOn

SubscribeOn and observeOn in the main thread

我正在尝试将 subscribeOn 和 obsereOn 与 Executor 一起使用,以便在异步任务完成后让我返回主线程。 我最终得到了这段代码,但它不起作用

@Test
    public void testBackToMainThread() throws InterruptedException {
        processValue(1);
        processValue(2);
        processValue(3);
        processValue(4);
        processValue(5);
//        while (tasks.size() != 0) {
//            tasks.take().run();
//        }
        System.out.println("done");
    }

    private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();


    private void processValue(int value) throws InterruptedException {
        Observable.just(value)
                .subscribeOn(Schedulers.io())
                .doOnNext(number -> processExecution())
                .observeOn(Schedulers.from(command -> tasks.add(command)))
                .subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
        tasks.take().run();
    }

    private void processExecution() {
        System.out.println("Execution in " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

知道如何实现我想要的吗?

当我运行我只打印

Execution in RxIoScheduler-2
Execution in RxIoScheduler-3
Execution in RxIoScheduler-4
Execution in RxIoScheduler-5
Execution in RxIoScheduler-6
done

此致

你的问题不会出现在RxJava2中。推荐使用RxJava2.

我对比了RxJava-1.2.7和RxJava-2.0.7,找到了根本原因。现在我正在寻找解决方案。

在 RxJava-1.2.7.You 中可以看到 ObservableObserveOn#145 并在调用 request 时找到 schedule 任务。这意味着当你订阅它时它会调用 Executor.execute。因此,您的任务队列会立即接受 Runnable。然后你拿 运行 Runnable(这是实际的 ExecutorSchedulerWorker)但是上游的 onNext 还没有被调用(因为你睡了 2000 毫秒)。它将在 ObserveOnSubscriber#213 return null。当上游调用onNext(Integer)时,任务永远不会是运行.

你的方法的问题是你不知道在给定的时间应该执行多少任务,也不会在你解除主线程阻塞后等待应该发生的任务时死锁。

据我所知,1.x 的任何扩展都不支持返回 Java 主线程。对于 2.x,扩展项目中的 BlockingScheduler 允许您这样做:

public static void main(String[] args) {
    BlockingScheduler scheduler = new BlockingScheduler();

    scheduler.execute(() -> {
        Flowable.range(1,10)
        .subscribeOn(Schedulers.io())
        .observeOn(scheduler)
        .doAfterTerminate(() -> scheduler.shutdown())
        .subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
    });

    System.out.println("BlockingScheduler finished");
}

注意对 scheduler.shutdown() 的调用,最终必须调用它来释放主线程,否则您的程序可能永远不会终止。

我只是根据 akanord 的建议更新了我的代码,但这种方法似乎将一个任务阻塞到另一个任务,并且最终 运行ning 顺序。

使用代码:

   @Test
    public void testBackToMainThread() throws InterruptedException {
        processValue(1);
        processValue(2);
        processValue(3);
        processValue(4);
        processValue(5);
        System.out.println("done");
    }

    private void processValue(int value) throws InterruptedException {
        BlockingScheduler scheduler = new BlockingScheduler();
        scheduler.execute(() -> Flowable.just(value)
                .subscribeOn(Schedulers.io())
                .doOnNext(number -> processExecution())
                .observeOn(scheduler)
                .doAfterTerminate(() -> scheduler.shutdown())
                .subscribe(v -> System.out.println(v + " on " + Thread.currentThread())));
    }

    private void processExecution() {
        System.out.println("Execution in " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

和输出

Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
2 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
3 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
4 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
5 on Thread[main,5,main]
done

我要实现的就是这个输出

Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
2 on Thread[main,5,main]
3 on Thread[main,5,main]
4 on Thread[main,5,main]
5 on Thread[main,5,main]
done

所以每次主线程 运行 管道 运行 另一个线程中的 onNext 然后它 return 从方法直到另一个线程完成并使其成为主线程回到管道。