适配 RX Completable 的代码不会阻塞 onSubscribe 线程

Code adapted to RX Completable not blocking onSubscribe thread

我有一些遗留的非 RX 代码,它们通过生成新线程来完成一些网络工作。 工作完成后,它会在回调中调用一个方法。

我无法控制运行此代码的线程。它是遗留的,它自己产生了一个新的 Thread

这可以简化为:

interface Callback {
    void onSuccess();
}

static void executeRequest(String name, Callback callback) {
    new Thread(() -> {
        try {
            System.out.println(" Starting... " + name);
            Thread.sleep(2000);
            System.out.println(" Finishing... " + name);
            callback.onSuccess();
        } catch (InterruptedException ignored) {}
    }).start();
}

我想将其转换为 RX Completable。为此,我使用 Completable#create()CompletableEmitter 的实现调用 executeRequest 传递 Callback 的实现,在请求完成时发出信号。

订阅后我还会打印日志跟踪以帮助我进行调试。

static Completable createRequestCompletable(String name) {
        return Completable.create(e -> executeRequest(name, e::onComplete))
                .doOnSubscribe(d -> System.out.println("Subscribed to " + name));
}

这按预期工作。 Completable 仅在 "request" 完成并调用回调后才完成。

问题是,当在 trampoline 调度程序中订阅这些可完成项时,它不会在订阅第二个请求之前等待第一个请求完成。

此代码:

final Completable c1 = createRequestCompletable("1");
c1.subscribeOn(Schedulers.trampoline()).subscribe();

final Completable c2 = createRequestCompletable("2");
c2.subscribeOn(Schedulers.trampoline()).subscribe();

输出:

Subscribed to 1
    Starting... 1 
Subscribed to 2
    Starting... 2 
    Finishing... 1 
    Finishing... 2

如您所见,它在第一个 Completable 完成之前订阅了第二个 Completable,即使我在 trampoline.

中订阅也是如此

我想排队 completables,以便第二个等待第一个完成,输出:

Subscribed to 1
    Starting... 1 
    Finishing... 1
Subscribed to 2
    Starting... 2  
    Finishing... 2

我确定问题与在工作线程中完成的工作有关。如果 Completable 的实现没有产生新线程,它会按预期工作。 但这是遗留代码,我想做的是在不修改的情况下将其适应 RX。

注意:请求在程序的不同点执行 - 我不能使用 andThenconcat 来实现序列化执行。

我通过使用 Latch 显式阻止订阅 Thread 成功地按顺序执行了 Completables。 但我不认为这是在 RX 中执行此操作的惯用方法,而且我仍然不明白为什么我需要这样做并且线程在 Completable 完成之前不会被阻塞。

static Completable createRequestCompletable(String name) {
    final CountDownLatch latch = new CountDownLatch(1);
    return Completable.create(e -> {
        executeRequest(name, () -> {
            e.onComplete();
            latch.countDown();
        });
        latch.await();
    })
    .doOnSubscribe(disposable -> System.out.println("Subscribed to " + name));
}