在 pyspark 中使用缓冲区收集 RDD

collect RDD with buffer in pyspark

我想要一种从我的 RDD 一次(或小批量)return 行的方法,以便我可以在需要时在本地收集行。我的 RDD 足够大,无法放入名称节点的内存中,因此 运行 collect() 会导致错误。

有没有办法重新创建 collect() 操作但使用生成器,以便将 RDD 中的行传递到缓冲区中?另一种选择是从缓存的 RDD 中一次 take() 100000 行,但我认为 take() 不允许您指定起始位置?

最好的选择是使用 RDD.toLocalIterator,它当时只收集一个分区。它创建了一个标准的 Python 生成器:

rdd = sc.parallelize(range(100000))
iterator = rdd.toLocalIterator()
type(iterator)

## generator

even = (x for x in iterator if not x % 2)

您可以使用特定的分区程序调整单个批次中收集的数据量并调整多个分区。

不幸的是,它是有代价的。要收集小批量数据,您必须启动多个 Spark 作业,而且成本非常高。所以一般来说当时收集一个元素是不行的。