线程进入 Schedulers.io - RxJava

Thread sleep into Schedulers.io - RxJava

我现在正在测试 RxJava,我很困惑。
我用 Thread.sleep(5000)

放置了方法

进入Schedulers.io()

CompositeDisposable compositeDisposable = new CompositeDisposable();
        compositeDisposable.add(longOperation()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableCompletableObserver() {
            @Override
            public void onComplete() {
                customToast("long operation done");
            }

            @Override
            public void onError(Throwable e) {

            }
        }));

我读到,我们应该将长操作移至 schedulers.io,因为我们无法冻结 UI 线程,但在这种情况下,我得到了冻结 UI。怎么了?
longOperation() 里面有 Thread.sleep(5000)。

//编辑后Callable

private void doSomething() throws InterruptedException {
        CompositeDisposable compositeDisposable = new CompositeDisposable();
        compositeDisposable.add(Observable.fromCallable(()-> longOperation())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableCompletableObserver() {
            @Override
            public void onComplete() {
                customToast("long operation done");
            }

            @Override
            public void onError(Throwable e) {

            }
        }));
    }

  private Completable longOperation() throws InterruptedException {
        Thread.sleep(5000);
        return Completable.complete();
    }

您应该使用 defer() 以便它实际在调度程序上执行,而不是在您尝试创建可完成项时执行。

private void doSomething() throws InterruptedException {
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    compositeDisposable.add(Completable.defer(new Callable<CompletableSource>() {
        @Override
        public CompletableSource call() throws Exception {
            return longOperation();
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableCompletableObserver() {
                @Override
                public void onComplete() {
                    customToast("long operation done");
                }

                @Override
                public void onError(Throwable e) {

                }
            }));
}

private Completable longOperation() throws InterruptedException {
    Thread.sleep(5000);
    return Completable.complete();
}