RxJava 2 入门:串行执行任务。 `andThen` 还是 `defer`?
RxJava 2 Getting Started: serially execute tasks. `andThen` or `defer`?
我想在 RxJava 中连续 运行 两个步骤。我希望第 1 步在第 2 步开始之前完成,例如:
step 1: start
step 1: finish
step 2: start
step 2: finish
我正在尝试不同的 API 变体,RxJava 运行并行执行我的两个步骤,这不是我想要的行为:
step 1: start
step 2: start
step 2: finish
step 1: finish
在下面的代码示例中,我同时尝试了 andThen
和 defer
,并且得到了并行执行。我该如何解决这个问题,以便一个步骤在另一个步骤成功完成后执行?
方法名andThen
表示顺序串行执行。方法 defer
采用一个函数,该函数生成另一个 Completable,这是我期望的用于我想要的串行任务执行的方法签名。也不给我想要的结果。
我需要转换为 Observable/Flowable 吗?或者我可以用 Completable 链接两个步骤吗?
public class RxStep1Then2 {
public static Completable simulateCompletable(ScheduledExecutorService es, String msg, int msDelay) {
System.out.println(String.format("%s: start", msg));
ScheduledFuture<?> future = es.schedule(() -> {
System.out.println(String.format("%s: finish", msg));
}, msDelay, TimeUnit.MILLISECONDS);
return Completable.fromFuture(future);
}
public static void rxMain(ScheduledExecutorService es) {
// Completable c = simulateCompletable(es, "step 1", 1000)
// .andThen(simulateCompletable(es, "step 2", 500));
Completable c = simulateCompletable(es, "step 1", 1000)
.defer(() -> simulateCompletable(es, "step 2", 500));
c.blockingAwait();
System.out.println("blockingAwait done");
}
public static void main(String[] args) throws Exception {
ScheduledExecutorService es = Executors.newScheduledThreadPool(5);
System.out.println("Started ExecutorService.");
rxMain(es);
es.shutdown();
es.awaitTermination(5, TimeUnit.MINUTES);
System.out.println("Shutdown ExecutorService. Done.");
}
}
并行执行的发生是因为您的 simulateCompletable
在 Completable
甚至创建之前就开始了任务。您可以直接使用延迟 Completable
:
Completable.fromAction(() -> System.out.println("First"))
.delay(1, TimeUnit.SECONDS)
.andThen(Completable.fromAction(() -> System.out.println("Second")))
.blockingAwait();
注意
Completable c = simulateCompletable(es, "step 1", 1000)
.defer(() -> simulateCompletable(es, "step 2", 500));
不链接任何操作,因为defer
是创建独立Completable
的静态工厂方法;第一个 simulateCompletable
的 Completable
完全丢失了。
我想在 RxJava 中连续 运行 两个步骤。我希望第 1 步在第 2 步开始之前完成,例如:
step 1: start
step 1: finish
step 2: start
step 2: finish
我正在尝试不同的 API 变体,RxJava 运行并行执行我的两个步骤,这不是我想要的行为:
step 1: start
step 2: start
step 2: finish
step 1: finish
在下面的代码示例中,我同时尝试了 andThen
和 defer
,并且得到了并行执行。我该如何解决这个问题,以便一个步骤在另一个步骤成功完成后执行?
方法名andThen
表示顺序串行执行。方法 defer
采用一个函数,该函数生成另一个 Completable,这是我期望的用于我想要的串行任务执行的方法签名。也不给我想要的结果。
我需要转换为 Observable/Flowable 吗?或者我可以用 Completable 链接两个步骤吗?
public class RxStep1Then2 {
public static Completable simulateCompletable(ScheduledExecutorService es, String msg, int msDelay) {
System.out.println(String.format("%s: start", msg));
ScheduledFuture<?> future = es.schedule(() -> {
System.out.println(String.format("%s: finish", msg));
}, msDelay, TimeUnit.MILLISECONDS);
return Completable.fromFuture(future);
}
public static void rxMain(ScheduledExecutorService es) {
// Completable c = simulateCompletable(es, "step 1", 1000)
// .andThen(simulateCompletable(es, "step 2", 500));
Completable c = simulateCompletable(es, "step 1", 1000)
.defer(() -> simulateCompletable(es, "step 2", 500));
c.blockingAwait();
System.out.println("blockingAwait done");
}
public static void main(String[] args) throws Exception {
ScheduledExecutorService es = Executors.newScheduledThreadPool(5);
System.out.println("Started ExecutorService.");
rxMain(es);
es.shutdown();
es.awaitTermination(5, TimeUnit.MINUTES);
System.out.println("Shutdown ExecutorService. Done.");
}
}
并行执行的发生是因为您的 simulateCompletable
在 Completable
甚至创建之前就开始了任务。您可以直接使用延迟 Completable
:
Completable.fromAction(() -> System.out.println("First"))
.delay(1, TimeUnit.SECONDS)
.andThen(Completable.fromAction(() -> System.out.println("Second")))
.blockingAwait();
注意
Completable c = simulateCompletable(es, "step 1", 1000)
.defer(() -> simulateCompletable(es, "step 2", 500));
不链接任何操作,因为defer
是创建独立Completable
的静态工厂方法;第一个 simulateCompletable
的 Completable
完全丢失了。