根据现有列中的值将 Spark DataFrame 分区为选定数量的分区
Partition a Spark DataFrame based on values in an existing column into a chosen number of partitions
我想在写入文件之前根据索引列将 Spark DataFrame 分成偶数个分区。我想根据 DataFrame 的大小控制创建多少个分区,然后在使用 partitionBy
.
写入 Parquet 文件时使用
有一个示例 DataFrame:
i b
0 11
1 9
2 13
3 2
4 15
5 3
6 14
7 16
8 11
9 9
10 17
11 10
假设我想根据列 i
中的值创建 4 个分区,那么这些分区将对应于分配给列 g
:
的值
g i b
0 0 11
0 1 9
0 2 13
1 3 2
1 4 15
1 5 3
2 6 14
2 7 16
2 8 11
3 9 9
3 10 17
3 11 10
在 Spark 中执行此操作的首选方法是什么?
虽然文档似乎有点难以理解,并且对这个问题做了一些假设 - 即它想要 4 个或更确切地说 N 个文件(?)作为输出,并在列 [=36] 中以升序方式表示=],这里是我自己的 Spark 2.4 改编示例,它采用 20 条记录并将它们分成 4 个均匀分布的分区,然后将它们写出。出发吧:
val list = sc.makeRDD((1 to 20)).map((_, 1,"2019-01-01", "2019-01-01",1,2,"XXXXXXXXXXXXXXXXXXXXXXXXXX"))
val df = list.toDF("customer_id", "dummy", "report_date", "date", "value_1", "value_2", "dummy_string")
df.show(false)
仅显示一些条目:
+-----------+-----+-----------+----------+-------+-------+--------------------------+
|customer_id|dummy|report_date|date |value_1|value_2|dummy_string |
+-----------+-----+-----------+----------+-------+-------+--------------------------+
|1 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|2 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|3 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|4 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|5 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|6 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|7 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
...
然后 - 包括一些额外的排序以达到良好的效果 - 这不是必需的,适用于所有格式:
df.repartitionByRange(4, $"customer_id")
.sortWithinPartitions("customer_id", "date", "value_1")
.write
.parquet("/tmp/SOQ6")
这样得到了4个文件如下图:
可以看到4个文件,首末两部分命名明显。 运行:
val lines = spark.read.parquet("/tmp/SOQ6/part-00000-tid-2518447510905190948-a81455f6-6c0b-4e02-89b0-57dfddf1fb97-1200-c000.snappy.parquet")
val words = lines.collect
lines.count
显示5条记录,内容按照dataframe连续排序。
lines: org.apache.spark.sql.DataFrame = [customer_id: int, dummy: int ... 5 more fields]
words: Array[org.apache.spark.sql.Row] = Array([1,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [2,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [3,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [4,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [5,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX])
res11: Long = 5
运行 所有文件都这样,但只显示一个。
Final comments
Whether this is a good idea is a different story, e.g. think of non-broadcast JOINs that is an issue.
In addition I would obviously not hard-code the 4, but apply some formula for N to to be applied to the partitionByRange! E.g.:
val N = some calculation based on counts in DF and your cluster
val df2 = df.repartition(N, $"c1", $"c2")
You have to test the DF Writer as the documentation is not entirely clear.
Checked on EMR Cluster with 2M records, 4 files as well in terms of output.
我想在写入文件之前根据索引列将 Spark DataFrame 分成偶数个分区。我想根据 DataFrame 的大小控制创建多少个分区,然后在使用 partitionBy
.
有一个示例 DataFrame:
i b
0 11
1 9
2 13
3 2
4 15
5 3
6 14
7 16
8 11
9 9
10 17
11 10
假设我想根据列 i
中的值创建 4 个分区,那么这些分区将对应于分配给列 g
:
g i b
0 0 11
0 1 9
0 2 13
1 3 2
1 4 15
1 5 3
2 6 14
2 7 16
2 8 11
3 9 9
3 10 17
3 11 10
在 Spark 中执行此操作的首选方法是什么?
虽然文档似乎有点难以理解,并且对这个问题做了一些假设 - 即它想要 4 个或更确切地说 N 个文件(?)作为输出,并在列 [=36] 中以升序方式表示=],这里是我自己的 Spark 2.4 改编示例,它采用 20 条记录并将它们分成 4 个均匀分布的分区,然后将它们写出。出发吧:
val list = sc.makeRDD((1 to 20)).map((_, 1,"2019-01-01", "2019-01-01",1,2,"XXXXXXXXXXXXXXXXXXXXXXXXXX"))
val df = list.toDF("customer_id", "dummy", "report_date", "date", "value_1", "value_2", "dummy_string")
df.show(false)
仅显示一些条目:
+-----------+-----+-----------+----------+-------+-------+--------------------------+
|customer_id|dummy|report_date|date |value_1|value_2|dummy_string |
+-----------+-----+-----------+----------+-------+-------+--------------------------+
|1 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|2 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|3 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|4 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|5 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|6 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|7 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
...
然后 - 包括一些额外的排序以达到良好的效果 - 这不是必需的,适用于所有格式:
df.repartitionByRange(4, $"customer_id")
.sortWithinPartitions("customer_id", "date", "value_1")
.write
.parquet("/tmp/SOQ6")
这样得到了4个文件如下图:
可以看到4个文件,首末两部分命名明显。 运行:
val lines = spark.read.parquet("/tmp/SOQ6/part-00000-tid-2518447510905190948-a81455f6-6c0b-4e02-89b0-57dfddf1fb97-1200-c000.snappy.parquet")
val words = lines.collect
lines.count
显示5条记录,内容按照dataframe连续排序。
lines: org.apache.spark.sql.DataFrame = [customer_id: int, dummy: int ... 5 more fields]
words: Array[org.apache.spark.sql.Row] = Array([1,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [2,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [3,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [4,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [5,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX])
res11: Long = 5
运行 所有文件都这样,但只显示一个。
Final comments
Whether this is a good idea is a different story, e.g. think of non-broadcast JOINs that is an issue.
In addition I would obviously not hard-code the 4, but apply some formula for N to to be applied to the partitionByRange! E.g.:
val N = some calculation based on counts in DF and your cluster val df2 = df.repartition(N, $"c1", $"c2")
You have to test the DF Writer as the documentation is not entirely clear.
Checked on EMR Cluster with 2M records, 4 files as well in terms of output.