适配 RX Completable 的代码不会阻塞 onSubscribe 线程
Code adapted to RX Completable not blocking onSubscribe thread
我有一些遗留的非 RX 代码,它们通过生成新线程来完成一些网络工作。
工作完成后,它会在回调中调用一个方法。
我无法控制运行此代码的线程。它是遗留的,它自己产生了一个新的 Thread
。
这可以简化为:
interface Callback {
void onSuccess();
}
static void executeRequest(String name, Callback callback) {
new Thread(() -> {
try {
System.out.println(" Starting... " + name);
Thread.sleep(2000);
System.out.println(" Finishing... " + name);
callback.onSuccess();
} catch (InterruptedException ignored) {}
}).start();
}
我想将其转换为 RX Completable
。为此,我使用 Completable#create()
。
CompletableEmitter
的实现调用 executeRequest
传递 Callback
的实现,在请求完成时发出信号。
订阅后我还会打印日志跟踪以帮助我进行调试。
static Completable createRequestCompletable(String name) {
return Completable.create(e -> executeRequest(name, e::onComplete))
.doOnSubscribe(d -> System.out.println("Subscribed to " + name));
}
这按预期工作。 Completable
仅在 "request" 完成并调用回调后才完成。
问题是,当在 trampoline
调度程序中订阅这些可完成项时,它不会在订阅第二个请求之前等待第一个请求完成。
此代码:
final Completable c1 = createRequestCompletable("1");
c1.subscribeOn(Schedulers.trampoline()).subscribe();
final Completable c2 = createRequestCompletable("2");
c2.subscribeOn(Schedulers.trampoline()).subscribe();
输出:
Subscribed to 1
Starting... 1
Subscribed to 2
Starting... 2
Finishing... 1
Finishing... 2
如您所见,它在第一个 Completable
完成之前订阅了第二个 Completable
,即使我在 trampoline
.
中订阅也是如此
我想排队 completables
,以便第二个等待第一个完成,输出:
Subscribed to 1
Starting... 1
Finishing... 1
Subscribed to 2
Starting... 2
Finishing... 2
我确定问题与在工作线程中完成的工作有关。如果 Completable
的实现没有产生新线程,它会按预期工作。
但这是遗留代码,我想做的是在不修改的情况下将其适应 RX。
注意:请求在程序的不同点执行 - 我不能使用 andThen
或 concat
来实现序列化执行。
我通过使用 Latch
显式阻止订阅 Thread
成功地按顺序执行了 Completables
。
但我不认为这是在 RX 中执行此操作的惯用方法,而且我仍然不明白为什么我需要这样做并且线程在 Completable
完成之前不会被阻塞。
static Completable createRequestCompletable(String name) {
final CountDownLatch latch = new CountDownLatch(1);
return Completable.create(e -> {
executeRequest(name, () -> {
e.onComplete();
latch.countDown();
});
latch.await();
})
.doOnSubscribe(disposable -> System.out.println("Subscribed to " + name));
}
我有一些遗留的非 RX 代码,它们通过生成新线程来完成一些网络工作。 工作完成后,它会在回调中调用一个方法。
我无法控制运行此代码的线程。它是遗留的,它自己产生了一个新的 Thread
。
这可以简化为:
interface Callback {
void onSuccess();
}
static void executeRequest(String name, Callback callback) {
new Thread(() -> {
try {
System.out.println(" Starting... " + name);
Thread.sleep(2000);
System.out.println(" Finishing... " + name);
callback.onSuccess();
} catch (InterruptedException ignored) {}
}).start();
}
我想将其转换为 RX Completable
。为此,我使用 Completable#create()
。
CompletableEmitter
的实现调用 executeRequest
传递 Callback
的实现,在请求完成时发出信号。
订阅后我还会打印日志跟踪以帮助我进行调试。
static Completable createRequestCompletable(String name) {
return Completable.create(e -> executeRequest(name, e::onComplete))
.doOnSubscribe(d -> System.out.println("Subscribed to " + name));
}
这按预期工作。 Completable
仅在 "request" 完成并调用回调后才完成。
问题是,当在 trampoline
调度程序中订阅这些可完成项时,它不会在订阅第二个请求之前等待第一个请求完成。
此代码:
final Completable c1 = createRequestCompletable("1");
c1.subscribeOn(Schedulers.trampoline()).subscribe();
final Completable c2 = createRequestCompletable("2");
c2.subscribeOn(Schedulers.trampoline()).subscribe();
输出:
Subscribed to 1
Starting... 1
Subscribed to 2
Starting... 2
Finishing... 1
Finishing... 2
如您所见,它在第一个 Completable
完成之前订阅了第二个 Completable
,即使我在 trampoline
.
我想排队 completables
,以便第二个等待第一个完成,输出:
Subscribed to 1
Starting... 1
Finishing... 1
Subscribed to 2
Starting... 2
Finishing... 2
我确定问题与在工作线程中完成的工作有关。如果 Completable
的实现没有产生新线程,它会按预期工作。
但这是遗留代码,我想做的是在不修改的情况下将其适应 RX。
注意:请求在程序的不同点执行 - 我不能使用 andThen
或 concat
来实现序列化执行。
我通过使用 Latch
显式阻止订阅 Thread
成功地按顺序执行了 Completables
。
但我不认为这是在 RX 中执行此操作的惯用方法,而且我仍然不明白为什么我需要这样做并且线程在 Completable
完成之前不会被阻塞。
static Completable createRequestCompletable(String name) {
final CountDownLatch latch = new CountDownLatch(1);
return Completable.create(e -> {
executeRequest(name, () -> {
e.onComplete();
latch.countDown();
});
latch.await();
})
.doOnSubscribe(disposable -> System.out.println("Subscribed to " + name));
}