如何使用 pyspark 跨集群管理数据帧的物理数据放置?
How to manage physical data placement of a dataframe across the cluster with pyspark?
假设我有一个 pyspark 数据框 'data',如下所示。我想按 "Period" 对数据进行分区。相反,我希望每个数据周期都存储在它自己的分区上(参见下面 'data' 数据帧下面的示例)。
data = sc.parallelize([[1,1,0,14277.4,0], \
[1,2,0,14277.4,0], \
[2,1,0,4741.91,0], \
[2,2,0,4693.03,0], \
[3,1,2,9565.93,0], \
[3,2,2,9566.05,0], \
[4,2,0,462.68,0], \
[5,1,1,3549.66,0], \
[5,2,5,3549.66,1], \
[6,1,1,401.52,0], \
[6,2,0,401.52,0], \
[7,1,0,1886.24,0], \
[7,2,0,1886.24,0]]) \
.toDF(("Acct","Period","Status","Bal","CloseFlag"))
data.show(100)
+----+------+------+-------+---------+
|Acct|Period|Status| Bal|CloseFlag|
+----+------+------+-------+---------+
| 1| 1| 0|14277.4| 0|
| 1| 2| 0|14277.4| 0|
| 2| 1| 0|4741.91| 0|
| 2| 2| 0|4693.03| 0|
| 3| 1| 2|9565.93| 0|
| 3| 2| 2|9566.05| 0|
| 4| 2| 0| 462.68| 0|
| 5| 1| 1|3549.66| 0|
| 5| 2| 5|3549.66| 1|
| 6| 1| 1| 401.52| 0|
| 6| 2| 0| 401.52| 0|
+----+------+------+-------+---------+
例如
分区 1:
+----+------+------+-------+---------+
|Acct|Period|Status| Bal|CloseFlag|
+----+------+------+-------+---------+
| 1| 1| 0|14277.4| 0|
| 2| 1| 0|4741.91| 0|
| 3| 1| 2|9565.93| 0|
| 5| 1| 1|3549.66| 0|
| 6| 1| 1| 401.52| 0|
+----+------+------+-------+---------+
分区 2:
+----+------+------+-------+---------+
|Acct|Period|Status| Bal|CloseFlag|
+----+------+------+-------+---------+
| 1| 2| 0|14277.4| 0|
| 2| 2| 0|4693.03| 0|
| 3| 2| 2|9566.05| 0|
| 4| 2| 0| 462.68| 0|
| 5| 2| 5|3549.66| 1|
| 6| 2| 0| 401.52| 0|
+----+------+------+-------+---------+
方法应该是先重新分区,得到正确的分区数(唯一周期数),然后在保存之前按Period列分区。
from pyspark.sql import functions as F
n = data.select(F.col('Period')).distinct().count()
data.repartition(n)\
.write \
.partitionBy("Period")\
.mode("overwrite")\
.format("parquet")\
.saveAsTable("testing")
假设我有一个 pyspark 数据框 'data',如下所示。我想按 "Period" 对数据进行分区。相反,我希望每个数据周期都存储在它自己的分区上(参见下面 'data' 数据帧下面的示例)。
data = sc.parallelize([[1,1,0,14277.4,0], \
[1,2,0,14277.4,0], \
[2,1,0,4741.91,0], \
[2,2,0,4693.03,0], \
[3,1,2,9565.93,0], \
[3,2,2,9566.05,0], \
[4,2,0,462.68,0], \
[5,1,1,3549.66,0], \
[5,2,5,3549.66,1], \
[6,1,1,401.52,0], \
[6,2,0,401.52,0], \
[7,1,0,1886.24,0], \
[7,2,0,1886.24,0]]) \
.toDF(("Acct","Period","Status","Bal","CloseFlag"))
data.show(100)
+----+------+------+-------+---------+
|Acct|Period|Status| Bal|CloseFlag|
+----+------+------+-------+---------+
| 1| 1| 0|14277.4| 0|
| 1| 2| 0|14277.4| 0|
| 2| 1| 0|4741.91| 0|
| 2| 2| 0|4693.03| 0|
| 3| 1| 2|9565.93| 0|
| 3| 2| 2|9566.05| 0|
| 4| 2| 0| 462.68| 0|
| 5| 1| 1|3549.66| 0|
| 5| 2| 5|3549.66| 1|
| 6| 1| 1| 401.52| 0|
| 6| 2| 0| 401.52| 0|
+----+------+------+-------+---------+
例如
分区 1:
+----+------+------+-------+---------+
|Acct|Period|Status| Bal|CloseFlag|
+----+------+------+-------+---------+
| 1| 1| 0|14277.4| 0|
| 2| 1| 0|4741.91| 0|
| 3| 1| 2|9565.93| 0|
| 5| 1| 1|3549.66| 0|
| 6| 1| 1| 401.52| 0|
+----+------+------+-------+---------+
分区 2:
+----+------+------+-------+---------+
|Acct|Period|Status| Bal|CloseFlag|
+----+------+------+-------+---------+
| 1| 2| 0|14277.4| 0|
| 2| 2| 0|4693.03| 0|
| 3| 2| 2|9566.05| 0|
| 4| 2| 0| 462.68| 0|
| 5| 2| 5|3549.66| 1|
| 6| 2| 0| 401.52| 0|
+----+------+------+-------+---------+
方法应该是先重新分区,得到正确的分区数(唯一周期数),然后在保存之前按Period列分区。
from pyspark.sql import functions as F
n = data.select(F.col('Period')).distinct().count()
data.repartition(n)\
.write \
.partitionBy("Period")\
.mode("overwrite")\
.format("parquet")\
.saveAsTable("testing")