并行执行计算量大的地图

Parallel execution of computationally expensive map

我是 ReactiveX 库的新手(我使用它的 scala 变体 RxScala)。

我有一个 Observable 以高速率发出值。我想将函数应用于 Observable (map) 的所有值。我在 map 中使用的函数在计算上相当昂贵。

有没有办法让线程池并行计算 map 阶段?

是的,有办法做到这一点。

我会将流缓冲成块并使用 Schedulers.computation() 在 cpu 上分配负载(它使用 Executor 基于大小等于可用处理器数量的线程池):

int chunkSize = 1000;
source
  .buffer(chunkSize)
  .flatMap(
    list -> 
      Observable
        .from(list)
        .map(expensive)
        .subscribeOn(Schedulers.computation()))
 ...

如果 map 操作足够昂贵,你可能在没有 buffer 的情况下表现同样出色:

source
  .flatMap(
    x -> 
      Observable
        .just(x)
        .map(expensive)
        .subscribeOn(Schedulers.computation()))