RxJava flatMap 不交错结果
RxJava flatMap does not interleave results
根据 ReactiveX 文档:
Note that FlatMap merges the emissions of these Observables, so that
they may interleave.
我试过了,似乎遇到了问题。
请注意 11, 12, 13 and 14
所有 运行 都在 Scheduler.compute()
线程上。然后 15 and 21
运行 在一起 2 秒后(意料之中,因为 delayedIdentity(...)
函数中指定的延迟)。然而,所有后续的 运行 都发生在前一个 运行 之后 2 秒(参见第 7-10 行;数据 22, 23, 24 and 25
)。我希望这最后 4 个交错,因为我使用了 flatMap(...)
。似乎在使用 flatMap 时,它会等待一个元素的结果,然后再继续下一个元素,这会阻止它交错。
我还使用 delayedEcho(...)
替代 delayedIdentity(...)
,它也产生了非交错结果。
给定以下代码:
public class Main {
private static int i = 0;
public static void main(String[] args) throws InterruptedException {
System.out.println(MessageFormat.format("{0} {1} {2}", getLine(), Instant.now(), "Start"));
Arrays.asList(11, 12, 13, 14, 15)
.stream()
.map(n -> Observable.just(n))
.map(o -> o
.observeOn(Schedulers.computation())
.flatMap(Main::delayedIdentity)
.subscribe(Main::println))
.collect(Collectors.toSet());
Observable.just(21, 22, 23, 24, 25)
.observeOn(Schedulers.computation())
.flatMap(Main::delayedIdentity)
.subscribe(Main::println);
Thread.sleep(25 * 1000);
}
public static Observable<Integer> delayedIdentity(Integer n) {
return Observable.create(s -> {
try {
Thread.sleep(2 * 1000);
s.onNext(n);
s.onCompleted();
} catch (InterruptedException e) {
s.onError(e);
}
});
}
public synchronized static void println(Object o) {
System.out.println(MessageFormat.format("{0} {1} {2}", getLine(), Instant.now(), o));
}
public synchronized static int getLine() {
return i++;
}
}
给出以下控制台日志:
0 2016-07-21T06:05:36.908Z Start
1 2016-07-21T06:05:39.169Z 11
2 2016-07-21T06:05:39.169Z 14
3 2016-07-21T06:05:39.169Z 12
4 2016-07-21T06:05:39.170Z 13
5 2016-07-21T06:05:41.171Z 15
6 2016-07-21T06:05:41.172Z 21
7 2016-07-21T06:05:43.175Z 22
8 2016-07-21T06:05:45.176Z 23
9 2016-07-21T06:05:47.180Z 24
10 2016-07-21T06:05:49.182Z 25
Process finished with exit code 0
回声备选方案:
public static Observable<Integer> delayedEcho(Integer n) {
return Observable.create(s -> {
try {
s.onNext(n);
Thread.sleep(2 * 1000);
s.onNext(n);
s.onCompleted();
} catch (InterruptedException e) {
s.onError(e);
}
});
}
这导致了以下类似的结果:
0 2016-07-21T06:13:19.867Z Start
1 2016-07-21T06:13:20.073Z 11
2 2016-07-21T06:13:20.074Z 14
3 2016-07-21T06:13:20.074Z 13
4 2016-07-21T06:13:20.075Z 12
5 2016-07-21T06:13:22.078Z 11
6 2016-07-21T06:13:22.079Z 12
7 2016-07-21T06:13:22.079Z 13
8 2016-07-21T06:13:22.080Z 14
9 2016-07-21T06:13:22.081Z 15
10 2016-07-21T06:13:22.081Z 21
11 2016-07-21T06:13:24.087Z 15
12 2016-07-21T06:13:24.087Z 21
13 2016-07-21T06:13:24.087Z 22
14 2016-07-21T06:13:26.089Z 22
15 2016-07-21T06:13:26.089Z 23
16 2016-07-21T06:13:28.091Z 23
17 2016-07-21T06:13:28.092Z 24
18 2016-07-21T06:13:30.094Z 24
19 2016-07-21T06:13:30.095Z 25
20 2016-07-21T06:13:32.098Z 25
Process finished with exit code 0
我做错了什么?
只是为了确保您了解 Schedulers.computation()
调度程序 - 它不是 1 个线程,它主要类似于线程池。如果 1 个线程忙,它将 return 你新的线程。要检查它,只需放置
System.out.println(MessageFormat.format("Thread id = {0}", Thread.currentThread().getId()));
在您的 delayedIdentity
函数中。有了这些知识可能会更清楚。
我不是 100% 确定这里让您感到困惑的是什么,但我会尝试逐步解释结果。
Arrays.asList(11, 12, 13, 14, 15)
.stream()
.map(n -> Observable.just(n))
.map(o -> o
.observeOn(Schedulers.computation())
.flatMap(Main::delayedIdentity)
.subscribe(Main::println))
.collect(Collectors.toSet());
因此,当您 运行 该代码时,流中的所有数据都会转换为 5 Observables
,然后几乎同时开始在延迟 delayedIdentity
后仅发出一项.结果,您几乎同时会在控制台中看到 11, 12, 13, 14, 15
。它不应该保存订单,因为 stream
可以在无序状态下传递。
因此,因为您在第一个语句中使用了非主线程,所以第二个语句将在第一个语句之后立即开始
Observable.just(21, 22, 23, 24, 25)
.observeOn(Schedulers.computation())
.flatMap(Main::delayedIdentity)
.subscribe(Main::println);
所以简单地说 21,将在 delayedIdentity
等待 2 秒,几乎与 11, 12, 13, 14, 15
相同,然后所有数字将打印在控制台中。 22, 23, 24, 25
- 将在 2 秒后一个一个打印出来。
有不明白的地方请追问
根据 ReactiveX 文档:
Note that FlatMap merges the emissions of these Observables, so that they may interleave.
我试过了,似乎遇到了问题。
请注意 11, 12, 13 and 14
所有 运行 都在 Scheduler.compute()
线程上。然后 15 and 21
运行 在一起 2 秒后(意料之中,因为 delayedIdentity(...)
函数中指定的延迟)。然而,所有后续的 运行 都发生在前一个 运行 之后 2 秒(参见第 7-10 行;数据 22, 23, 24 and 25
)。我希望这最后 4 个交错,因为我使用了 flatMap(...)
。似乎在使用 flatMap 时,它会等待一个元素的结果,然后再继续下一个元素,这会阻止它交错。
我还使用 delayedEcho(...)
替代 delayedIdentity(...)
,它也产生了非交错结果。
给定以下代码:
public class Main {
private static int i = 0;
public static void main(String[] args) throws InterruptedException {
System.out.println(MessageFormat.format("{0} {1} {2}", getLine(), Instant.now(), "Start"));
Arrays.asList(11, 12, 13, 14, 15)
.stream()
.map(n -> Observable.just(n))
.map(o -> o
.observeOn(Schedulers.computation())
.flatMap(Main::delayedIdentity)
.subscribe(Main::println))
.collect(Collectors.toSet());
Observable.just(21, 22, 23, 24, 25)
.observeOn(Schedulers.computation())
.flatMap(Main::delayedIdentity)
.subscribe(Main::println);
Thread.sleep(25 * 1000);
}
public static Observable<Integer> delayedIdentity(Integer n) {
return Observable.create(s -> {
try {
Thread.sleep(2 * 1000);
s.onNext(n);
s.onCompleted();
} catch (InterruptedException e) {
s.onError(e);
}
});
}
public synchronized static void println(Object o) {
System.out.println(MessageFormat.format("{0} {1} {2}", getLine(), Instant.now(), o));
}
public synchronized static int getLine() {
return i++;
}
}
给出以下控制台日志:
0 2016-07-21T06:05:36.908Z Start
1 2016-07-21T06:05:39.169Z 11
2 2016-07-21T06:05:39.169Z 14
3 2016-07-21T06:05:39.169Z 12
4 2016-07-21T06:05:39.170Z 13
5 2016-07-21T06:05:41.171Z 15
6 2016-07-21T06:05:41.172Z 21
7 2016-07-21T06:05:43.175Z 22
8 2016-07-21T06:05:45.176Z 23
9 2016-07-21T06:05:47.180Z 24
10 2016-07-21T06:05:49.182Z 25
Process finished with exit code 0
回声备选方案:
public static Observable<Integer> delayedEcho(Integer n) {
return Observable.create(s -> {
try {
s.onNext(n);
Thread.sleep(2 * 1000);
s.onNext(n);
s.onCompleted();
} catch (InterruptedException e) {
s.onError(e);
}
});
}
这导致了以下类似的结果:
0 2016-07-21T06:13:19.867Z Start
1 2016-07-21T06:13:20.073Z 11
2 2016-07-21T06:13:20.074Z 14
3 2016-07-21T06:13:20.074Z 13
4 2016-07-21T06:13:20.075Z 12
5 2016-07-21T06:13:22.078Z 11
6 2016-07-21T06:13:22.079Z 12
7 2016-07-21T06:13:22.079Z 13
8 2016-07-21T06:13:22.080Z 14
9 2016-07-21T06:13:22.081Z 15
10 2016-07-21T06:13:22.081Z 21
11 2016-07-21T06:13:24.087Z 15
12 2016-07-21T06:13:24.087Z 21
13 2016-07-21T06:13:24.087Z 22
14 2016-07-21T06:13:26.089Z 22
15 2016-07-21T06:13:26.089Z 23
16 2016-07-21T06:13:28.091Z 23
17 2016-07-21T06:13:28.092Z 24
18 2016-07-21T06:13:30.094Z 24
19 2016-07-21T06:13:30.095Z 25
20 2016-07-21T06:13:32.098Z 25
Process finished with exit code 0
我做错了什么?
只是为了确保您了解 Schedulers.computation()
调度程序 - 它不是 1 个线程,它主要类似于线程池。如果 1 个线程忙,它将 return 你新的线程。要检查它,只需放置
System.out.println(MessageFormat.format("Thread id = {0}", Thread.currentThread().getId()));
在您的 delayedIdentity
函数中。有了这些知识可能会更清楚。
我不是 100% 确定这里让您感到困惑的是什么,但我会尝试逐步解释结果。
Arrays.asList(11, 12, 13, 14, 15)
.stream()
.map(n -> Observable.just(n))
.map(o -> o
.observeOn(Schedulers.computation())
.flatMap(Main::delayedIdentity)
.subscribe(Main::println))
.collect(Collectors.toSet());
因此,当您 运行 该代码时,流中的所有数据都会转换为 5 Observables
,然后几乎同时开始在延迟 delayedIdentity
后仅发出一项.结果,您几乎同时会在控制台中看到 11, 12, 13, 14, 15
。它不应该保存订单,因为 stream
可以在无序状态下传递。
因此,因为您在第一个语句中使用了非主线程,所以第二个语句将在第一个语句之后立即开始
Observable.just(21, 22, 23, 24, 25)
.observeOn(Schedulers.computation())
.flatMap(Main::delayedIdentity)
.subscribe(Main::println);
所以简单地说 21,将在 delayedIdentity
等待 2 秒,几乎与 11, 12, 13, 14, 15
相同,然后所有数字将打印在控制台中。 22, 23, 24, 25
- 将在 2 秒后一个一个打印出来。
有不明白的地方请追问