并行执行计算量大的地图
Parallel execution of computationally expensive map
我是 ReactiveX 库的新手(我使用它的 scala 变体 RxScala)。
我有一个 Observable
以高速率发出值。我想将函数应用于 Observable
(map
) 的所有值。我在 map
中使用的函数在计算上相当昂贵。
有没有办法让线程池并行计算 map
阶段?
是的,有办法做到这一点。
我会将流缓冲成块并使用 Schedulers.computation()
在 cpu 上分配负载(它使用 Executor
基于大小等于可用处理器数量的线程池):
int chunkSize = 1000;
source
.buffer(chunkSize)
.flatMap(
list ->
Observable
.from(list)
.map(expensive)
.subscribeOn(Schedulers.computation()))
...
如果 map
操作足够昂贵,你可能在没有 buffer
的情况下表现同样出色:
source
.flatMap(
x ->
Observable
.just(x)
.map(expensive)
.subscribeOn(Schedulers.computation()))
我是 ReactiveX 库的新手(我使用它的 scala 变体 RxScala)。
我有一个 Observable
以高速率发出值。我想将函数应用于 Observable
(map
) 的所有值。我在 map
中使用的函数在计算上相当昂贵。
有没有办法让线程池并行计算 map
阶段?
是的,有办法做到这一点。
我会将流缓冲成块并使用 Schedulers.computation()
在 cpu 上分配负载(它使用 Executor
基于大小等于可用处理器数量的线程池):
int chunkSize = 1000;
source
.buffer(chunkSize)
.flatMap(
list ->
Observable
.from(list)
.map(expensive)
.subscribeOn(Schedulers.computation()))
...
如果 map
操作足够昂贵,你可能在没有 buffer
的情况下表现同样出色:
source
.flatMap(
x ->
Observable
.just(x)
.map(expensive)
.subscribeOn(Schedulers.computation()))