Spark:操作所有特定的 RDD 或 DataFrame 分区的数据
Spark: Manipulate all of a specific RDD or DataFrame partition's data
我发现一些帖子、文章、文档中的参考资料等暗示您可以使用 foreachPartition
访问特定分区。但是我还没有弄清楚如何处理给定分区中的所有数据。
我的目标是 select 数据库中的一些数据,对其进行操作,按列中的唯一值进行分区,然后将这些分区中的每一个作为一个专门命名的 jsonl 文件写入 s3 以被另一个系统访问。
repartitioned = myDataframe.repartition("processed_date")
repartitioned.foreachPartition(writePartitionToS3)
我尝试了很多方法来解析该数据,但似乎我只能在 foreachPartition
中获取单个元组,并且分区本身没有界限,以便有效地分离这些数据。
def writePartitionsToS3(partition):
for row in partition:
pprint (row)
产生(为简洁起见删除了几列):
Row(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3,
25)) Row(entity_id=u'2315183', ... processed_date=datetime.date(2015,
3, 25)) Row(entity_id=u'2315183', ...
processed_date=datetime.date(2015, 3, 25)) Row(entity_id=u'2315183',
... processed_date=datetime.date(2015, 3, 25))
也有可能分区没有像我想的那样定义,但我知道有一个内置的 DataFrameWriter
可以按分区写入,但我不能使用它。我真的需要能够生成这样的命名文件,而不是 part-xxx 格式:
s3a://<bucket>/<prefix>/<date processed>.jsonl
我以分区大小相对较小的方式分块数据(每个 processed_date、每个实体一个 selected,因为它是自己的 DataFrame),所以这不是一个问题,但我也不想 collect()
一个节点上的所有内容来解析分区列表,因为我想将文件并行写入 s3。
更新:
我最终通过获取唯一值然后根据这些值过滤原始数据集实际上解决了我的问题。请记住,如果数据集非常大,您将永远不想这样做,但我可以选择,因为我在循环中创建了小型数据帧(selecting 来自数据库),然后处理这些块.
# Get a list of the unique values present
# in the processed_date column
uniqueProcessedDates = myDataframe.select('processed_date') \
.distinct().rdd.map(lambda r: r[0]).collect()
# For each unique processed date we want to
# filter records and then write them
for value in uniqueProcessedDates:
sortedRowsThisProcessedDate = myDataframe.filter(postgresDF.processed_date == date)
# some custom function to write the data
writeProcessedDatesToS3(sortedRowsThisProcessedDate.collect())
综上所述,我敢肯定在很多方面这都是非常低效的。我正在考虑的一件事是根据需要写入每个文件的确切值集对每个 RDD 进行重新分区,因为对 s3 的写入必须以原子方式完成。我认为除此之外可能有助于避免在写入数据之前从多个节点收集数据。
访问无限制。 DataFrame.repartition
使用散列分区程序来打乱数据,因此行的共现没有更广泛的意义。您在这里可以假设的是特定 processed_date
的所有记录都位于特定分区上。
您可以通过添加 sortWithinPartitions
:
稍微改善一下情况
(myDataframe
.repartition("processed_date")
.sortWithinPartitions("processed_date"))
能够逐一访问单个日期的所有记录。
另一个可能的改进是使用orderBy
方法:
myDataframe.orderBy("processed_date")
这将导致连续的日期,但仍然无法访问边界。
在这两种情况下,您都必须在遍历分区时手动检测边界。
最后如果你想要真正的控制使用RDD
和repartitionAndSortWithinPartitions
方法。这将为您提供对数据分布的细粒度控制。您可以定义 partitionFunc
以特定方式分发数据,因此预先没有分区边界。
DataFrameWriter.partitionBy
使用不同的机制,在这里对您没有用。
我发现一些帖子、文章、文档中的参考资料等暗示您可以使用 foreachPartition
访问特定分区。但是我还没有弄清楚如何处理给定分区中的所有数据。
我的目标是 select 数据库中的一些数据,对其进行操作,按列中的唯一值进行分区,然后将这些分区中的每一个作为一个专门命名的 jsonl 文件写入 s3 以被另一个系统访问。
repartitioned = myDataframe.repartition("processed_date")
repartitioned.foreachPartition(writePartitionToS3)
我尝试了很多方法来解析该数据,但似乎我只能在 foreachPartition
中获取单个元组,并且分区本身没有界限,以便有效地分离这些数据。
def writePartitionsToS3(partition):
for row in partition:
pprint (row)
产生(为简洁起见删除了几列):
Row(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3, 25)) Row(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3, 25)) Row(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3, 25)) Row(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3, 25))
也有可能分区没有像我想的那样定义,但我知道有一个内置的 DataFrameWriter
可以按分区写入,但我不能使用它。我真的需要能够生成这样的命名文件,而不是 part-xxx 格式:
s3a://<bucket>/<prefix>/<date processed>.jsonl
我以分区大小相对较小的方式分块数据(每个 processed_date、每个实体一个 selected,因为它是自己的 DataFrame),所以这不是一个问题,但我也不想 collect()
一个节点上的所有内容来解析分区列表,因为我想将文件并行写入 s3。
更新:
我最终通过获取唯一值然后根据这些值过滤原始数据集实际上解决了我的问题。请记住,如果数据集非常大,您将永远不想这样做,但我可以选择,因为我在循环中创建了小型数据帧(selecting 来自数据库),然后处理这些块.
# Get a list of the unique values present
# in the processed_date column
uniqueProcessedDates = myDataframe.select('processed_date') \
.distinct().rdd.map(lambda r: r[0]).collect()
# For each unique processed date we want to
# filter records and then write them
for value in uniqueProcessedDates:
sortedRowsThisProcessedDate = myDataframe.filter(postgresDF.processed_date == date)
# some custom function to write the data
writeProcessedDatesToS3(sortedRowsThisProcessedDate.collect())
综上所述,我敢肯定在很多方面这都是非常低效的。我正在考虑的一件事是根据需要写入每个文件的确切值集对每个 RDD 进行重新分区,因为对 s3 的写入必须以原子方式完成。我认为除此之外可能有助于避免在写入数据之前从多个节点收集数据。
访问无限制。 DataFrame.repartition
使用散列分区程序来打乱数据,因此行的共现没有更广泛的意义。您在这里可以假设的是特定 processed_date
的所有记录都位于特定分区上。
您可以通过添加 sortWithinPartitions
:
(myDataframe
.repartition("processed_date")
.sortWithinPartitions("processed_date"))
能够逐一访问单个日期的所有记录。
另一个可能的改进是使用orderBy
方法:
myDataframe.orderBy("processed_date")
这将导致连续的日期,但仍然无法访问边界。
在这两种情况下,您都必须在遍历分区时手动检测边界。
最后如果你想要真正的控制使用RDD
和repartitionAndSortWithinPartitions
方法。这将为您提供对数据分布的细粒度控制。您可以定义 partitionFunc
以特定方式分发数据,因此预先没有分区边界。
DataFrameWriter.partitionBy
使用不同的机制,在这里对您没有用。