pyspark 负载分布不均,零件尺寸增加一倍

pyspark distributes load unevenly with parts increasing in double size

我的 pyspark 进程的输出部分大小不均匀,但可以预见具有 n**2 模式(0、1、2、4、8、16 等)。这是我的过程:

我像这样从 Google BigQuery 加载数据:

dConf = {
    "mapred.bq.project.id": project_id,
    "mapred.bq.gcs.bucket": bucket,
    "mapred.bq.input.project.id": project_id,
    "mapred.bq.input.dataset.id":dataset_id,
    "mapred.bq.input.table.id": table_id
}

rdd_dataset_raw = sc.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
    "org.apache.hadoop.io.LongWritable",
    "com.google.gson.JsonObject",
    conf=dConf
)

其输出如下所示 (rdd_dataset_raw.take(2)):

[(0, u'{"group_id":"1","pertubations":"Current Affairs,Sport,Technology"}'), 
(67, u'{"group_id":"2","pertubations":"Current Affairs,Sport,Celeb Gossip"}')]

一些琐碎的处理,重新分区:

rdd_dataset = (
    rdd_dataset_raw
    .repartition(nr_partitions)
    .map(lambda t, json=json: json.loads(t[1]))
)

看起来像这样:

[{u'group_id': u'1', u'pertubations': u'Current Affairs,Sport,Technology'}, 
{u'group_id': u'2', u'pertubations': u'Current Affairs,Sport,Celeb Gossip'}]

当我将 RDD 保存到 Google 存储时:

rdd_dataset_raw.saveAsTextFile("gs://bucket/directory")

这将创建 nr_partitions 个零件文件。

但是,这些零件文件的大小并不均匀。它们以 n**2 的形式增加,其中 n 是零件文件编号。换句话说,

part-00000 包含 0 行
part-00001 包含 1 行
part-00002 包含 2 行
part-00003 包含 4 行
part-00004 包含 8 行
等等

其中大部分也几乎立即完成,其中后面的部分 运行 内存不足。

这是怎么回事!?如何使分区负载均匀?

就像用 partitionBy 替换 repartition 一样简单:

rdd_dataset = (
    rdd_dataset_raw
    .partitionBy(nr_partitions)
    .map(lambda t, json=json: json.loads(t[1]))
)

请注意,这需要尽早完成。传递一个未分区的 rdd 然后分区后来坏了。

Docs