RxJava reduce() 在并行化时会不安全吗?
Can RxJava reduce() be unsafe when parallelized?
我想在 observable 上使用 reduce()
操作将其映射到 Guava ImmutableList
,因为我更喜欢它而不是标准 ArrayList
。
Observable<String> strings = ...
Observable<ImmutableList<String>> captured = strings.reduce(ImmutableList.<String>builder(), (b,s) -> b.add(s))
.map(ImmutableList.Builder::build);
captured.forEach(i -> System.out.println(i));
很简单。但是假设我在某处安排了与多个线程或其他东西并行的可观察 strings
。这不会破坏 reduce()
操作并可能导致竞争条件吗?特别是因为 ImmutableList.Builder
很容易受到影响?
根据Observable contract, an observable must not make onNext
calls in parallel, so you have to modify your strings
Observable to respect this. You can use the serialize运算符实现。
问题在于链的实现之间的共享状态。这是陷阱 #8 my blog:
Observable 链中的共享状态
假设您对 List 和 toList() 运算符 returns 的性能或类型不满意,并且您想推出自己的聚合器而不是它。对于更改,您想通过使用现有运算符来执行此操作,并且您找到了运算符 reduce():
Observable<Vector<Integer>> list = Observable
.range(1, 3)
.reduce(new Vector<Integer>(), (vector, value) -> {
vector.add(value);
return vector;
});
list.subscribe(System.out::println);
list.subscribe(System.out::println);
list.subscribe(System.out::println);
当您 运行 调用 'test' 时,第一个打印您期望的内容,但第二个打印一个向量,其中范围 1-3 出现两次,第三个订阅打印 9 个元素!
问题不在于 reduce() 运算符本身,而在于围绕它的期望。建立链时,传入的新 Vector 是一个 'global' 实例,将在链的所有评估之间共享。
当然,有一种方法可以解决此问题,而无需为整个目的实施运算符(如果您在之前的 CounterOp 中看到了潜力,这应该非常简单):
Observable<Vector<Integer>> list2 = Observable
.range(1, 3)
.reduce((Vector<Integer>)null, (vector, value) -> {
if (vector == null) {
vector = new Vector<>();
}
vector.add(value);
return vector;
});
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
您需要从 null 开始并在 accumulator 函数中创建一个向量,现在订阅者之间不共享该向量。
或者,您可以查看 collect()
运算符,它具有初始值的工厂回调。
这里的经验法则是,每当您看到类似聚合器的运算符获取一些普通值时,请小心,因为这 'initial value' 很可能会在所有订阅者之间共享,如果您打算使用结果与多个订阅者一起流,他们会发生冲突并可能给您带来意想不到的结果甚至崩溃。
我想在 observable 上使用 reduce()
操作将其映射到 Guava ImmutableList
,因为我更喜欢它而不是标准 ArrayList
。
Observable<String> strings = ...
Observable<ImmutableList<String>> captured = strings.reduce(ImmutableList.<String>builder(), (b,s) -> b.add(s))
.map(ImmutableList.Builder::build);
captured.forEach(i -> System.out.println(i));
很简单。但是假设我在某处安排了与多个线程或其他东西并行的可观察 strings
。这不会破坏 reduce()
操作并可能导致竞争条件吗?特别是因为 ImmutableList.Builder
很容易受到影响?
根据Observable contract, an observable must not make onNext
calls in parallel, so you have to modify your strings
Observable to respect this. You can use the serialize运算符实现。
问题在于链的实现之间的共享状态。这是陷阱 #8 my blog:
Observable 链中的共享状态
假设您对 List 和 toList() 运算符 returns 的性能或类型不满意,并且您想推出自己的聚合器而不是它。对于更改,您想通过使用现有运算符来执行此操作,并且您找到了运算符 reduce():
Observable<Vector<Integer>> list = Observable
.range(1, 3)
.reduce(new Vector<Integer>(), (vector, value) -> {
vector.add(value);
return vector;
});
list.subscribe(System.out::println);
list.subscribe(System.out::println);
list.subscribe(System.out::println);
当您 运行 调用 'test' 时,第一个打印您期望的内容,但第二个打印一个向量,其中范围 1-3 出现两次,第三个订阅打印 9 个元素!
问题不在于 reduce() 运算符本身,而在于围绕它的期望。建立链时,传入的新 Vector 是一个 'global' 实例,将在链的所有评估之间共享。
当然,有一种方法可以解决此问题,而无需为整个目的实施运算符(如果您在之前的 CounterOp 中看到了潜力,这应该非常简单):
Observable<Vector<Integer>> list2 = Observable
.range(1, 3)
.reduce((Vector<Integer>)null, (vector, value) -> {
if (vector == null) {
vector = new Vector<>();
}
vector.add(value);
return vector;
});
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
您需要从 null 开始并在 accumulator 函数中创建一个向量,现在订阅者之间不共享该向量。
或者,您可以查看 collect()
运算符,它具有初始值的工厂回调。
这里的经验法则是,每当您看到类似聚合器的运算符获取一些普通值时,请小心,因为这 'initial value' 很可能会在所有订阅者之间共享,如果您打算使用结果与多个订阅者一起流,他们会发生冲突并可能给您带来意想不到的结果甚至崩溃。