Spark map 只是一个任务,但它应该是并行的(PySpark)

Spark map is only one task while it should be parallel (PySpark)

我有一个 RDD,大约有 700 万个条目,每个条目有 10 个标准化坐标。我也有许多中心,我正在尝试将每个条目映射到最近的(欧几里德距离)中心。问题是这只会生成一个任务,这意味着它没有并行化。这是表格:

def doSomething(point,centers):
    for center in centers.value:
        if(distance(point,center)<1):
             return(center)
    return(None)

preppedData.map(lambda x:doSomething(x,centers)).take(5)

preppedData RDD 已缓存并已求值,doSomething 函数的表示比实际容易得多,但原理相同。中心是一份已经播出的名单。为什么这个地图只出现在一个任务中?

其他项目中的类似代码片段只映射到 +- 100 个任务,并在所有执行器上获得 运行,这个是 1 个执行器上的 1 个任务。我的工作有 8 个执行器,每个执行器有 8 GB 和 2 个内核可用。

这可能是由于 take() 方法的保守性质。 See the code in RDD.scala.

它所做的是首先获取你的 RDD 的第一个分区(如果你的 RDD 不需要洗牌,这将只需要一个任务)并且如果那个分区中有足够的结果,它将 return那个。如果您的分区中没有足够的数据,它将增加它尝试占用的分区数量,直到它获得所需数量的元素。

由于您的 RDD 已经缓存,并且您的操作只是一个 map 函数,只要您的任何 RDD 有 >5 行,这将只需要一个任务。更多的任务将是不必要的。

此代码的存在是为了通过一次从所有分区中提取少量数据来避免驱动程序因过多数据而过载。