在 spark dataframe 写入方法中覆盖特定分区
Overwrite specific partitions in spark dataframe write method
我想覆盖特定的分区而不是所有的spark。我正在尝试以下命令:
df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
其中 df 是具有要覆盖的增量数据的数据帧。
hdfs-base-path 包含主数据。
当我尝试上述命令时,它会删除所有分区,并将存在于 df 中的分区插入到 hdfs 路径中。
我的要求是仅覆盖指定 hdfs 路径中存在于 df 中的那些分区。有人可以帮我解决这个问题吗?
这是一个常见问题。 Spark 到 2.0 的唯一解决方案是直接写入分区目录,例如
df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
如果您使用的是 2.0 之前的 Spark,则需要使用以下方法阻止 Spark 发出元数据文件(因为它们会破坏自动分区发现):
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
如果您使用的是 1.6.2 之前的 Spark,您还需要删除 /root/path/to/data/partition_col=value
中的 _SUCCESS
文件,否则它的存在会破坏自动分区发现。 (我强烈建议使用 1.6.2 或更高版本。)
您可以从我在 Bulletproof Jobs 上的 Spark 峰会演讲中获得有关如何管理大型分区表的更多详细信息。
使用 Spark 1.6...
HiveContext 可以大大简化这个过程。关键是您必须首先使用定义了分区的 CREATE EXTERNAL TABLE
语句在 Hive 中创建 table。例如:
# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'
从这里开始,假设您有一个 Dataframe,其中包含特定分区(或多个分区)的新记录。您可以使用 HiveContext SQL 语句使用此 Dataframe 执行 INSERT OVERWRITE
,这将仅覆盖 Dataframe 中包含的分区的 table:
# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')
hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
SELECT name, age
FROM update_dataframe""")
注意:此示例中的 update_dataframe
具有与目标 test
table.
匹配的架构
使用这种方法容易犯的一个错误是跳过 Hive 中的 CREATE EXTERNAL TABLE
步骤,只使用 Dataframe API 的写入方法创建 table。特别是对于 Parquet-based tables,table 将不会被适当地定义以支持 Hive 的 INSERT OVERWRITE... PARTITION
函数。
希望这对您有所帮助。
如果您使用 DataFrame,可能您想对数据使用 Hive table。
在这种情况下,您只需要调用 method
df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)
它将覆盖 DataFrame 包含的分区。
没有必要指定格式 (orc),因为 Spark 将使用 Hive table 格式。
它在 Spark 1.6 版本中工作正常
你可以做这样的事情来使工作可重入(幂等):
(在 spark 2.2 上试过)
# drop the partition
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition)
print drop_query
spark.sql(drop_query)
# delete directory
dbutils.fs.rm(<partition_directoy>,recurse=True)
# Load the partition
df.write\
.partitionBy("partition_col")\
.saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)
终于!这是 Spark 2.3.0 中的一项功能:
SPARK-20236
使用需要将spark.sql.sources.partitionOverwriteMode
设置为动态,数据集需要分区,写入模式overwrite
。示例:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
我建议在写入之前根据您的分区列重新分区,这样您就不会以每个文件夹 400 个文件而告终。
在 Spark 2.3.0 之前,最好的解决方案是启动 SQL 语句删除这些分区,然后使用追加模式写入它们。
我建议您进行清理,然后使用 Append
模式写入新分区:
import scala.sys.process._
def deletePath(path: String): Unit = {
s"hdfs dfs -rm -r -skipTrash $path".!
}
df.select(partitionColumn).distinct.collect().foreach(p => {
val partition = p.getAs[String](partitionColumn)
deletePath(s"$path/$partitionColumn=$partition")
})
df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)
这只会删除新分区。写入数据后运行如果需要更新metastore请执行此命令:
sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")
注意: deletePath
假定 hfds
命令在您的系统上可用。
我尝试了以下方法来覆盖 HIVE 中的特定分区 table。
### load Data and check records
raw_df = spark.table("test.original")
raw_df.count()
lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925
### Check data in few partitions.
sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
print "Number of records: ", sample.count()
sample.show()
### Back-up the partitions before deletion
raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")
### UDF : To delete particular partition.
def delete_part(table, part):
qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
spark.sql(qry)
### Delete partitions
part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
part_list = part_df.rdd.map(lambda x : x[0]).collect()
table = "test.original"
for p in part_list:
delete_part(table, p)
### Do the required Changes to the columns in partitions
df = spark.table("test.original_bkp")
newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
newdf.select("c_customer_sk", "c_preferred_cust_flag").show()
### Write the Partitions back to Original table
newdf.write.insertInto("test.original")
### Verify data in Original table
orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()
Hope it helps.
Regards,
Neeraj
与其直接写入目标 table,我建议您创建一个类似于目标 table 的临时 table 并在其中插入您的数据。
CREATE TABLE tmpTbl LIKE trgtTbl LOCATION '<tmpLocation';
创建 table 后,您可以将数据写入 tmpLocation
df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)
然后您将通过执行以下命令恢复 table 分区路径:
MSCK REPAIR TABLE tmpTbl;
通过查询 Hive 元数据获取分区路径,例如:
SHOW PARTITONS tmpTbl;
从 trgtTbl
中删除这些分区并将目录从 tmpTbl
移动到 trgtTbl
正如 jatin 所写,您可以从配置单元和路径中删除分区,然后附加数据
因为我在上面浪费了太多时间,所以我为其他 spark 用户添加了以下示例。
我将 Scala 与 spark 2.2.1
一起使用
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
case class DataExample(partition1: Int, partition2: String, someTest: String, id: Int)
object WhosebugExample extends App {
//Prepare spark & Data
val sparkConf = new SparkConf()
sparkConf.setMaster(s"local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val tableName = "my_table"
val partitions1 = List(1, 2)
val partitions2 = List("e1", "e2")
val partitionColumns = List("partition1", "partition2")
val myTablePath = "/tmp/some_example"
val someText = List("text1", "text2")
val ids = (0 until 5).toList
val listData = partitions1.flatMap(p1 => {
partitions2.flatMap(p2 => {
someText.flatMap(
text => {
ids.map(
id => DataExample(p1, p2, text, id)
)
}
)
}
)
})
val asDataFrame = spark.createDataFrame(listData)
//Delete path function
def deletePath(path: String, recursive: Boolean): Unit = {
val p = new Path(path)
val fs = p.getFileSystem(new Configuration())
fs.delete(p, recursive)
}
def tableOverwrite(df: DataFrame, partitions: List[String], path: String): Unit = {
if (spark.catalog.tableExists(tableName)) {
//clean partitions
val asColumns = partitions.map(c => new Column(c))
val relevantPartitions = df.select(asColumns: _*).distinct().collect()
val partitionToRemove = relevantPartitions.map(row => {
val fields = row.schema.fields
s"ALTER TABLE ${tableName} DROP IF EXISTS PARTITION " +
s"${fields.map(field => s"${field.name}='${row.getAs(field.name)}'").mkString("(", ",", ")")} PURGE"
})
val cleanFolders = relevantPartitions.map(partition => {
val fields = partition.schema.fields
path + fields.map(f => s"${f.name}=${partition.getAs(f.name)}").mkString("/")
})
println(s"Going to clean ${partitionToRemove.size} partitions")
partitionToRemove.foreach(partition => spark.sqlContext.sql(partition))
cleanFolders.foreach(partition => deletePath(partition, true))
}
asDataFrame.write
.options(Map("path" -> myTablePath))
.mode(SaveMode.Append)
.partitionBy(partitionColumns: _*)
.saveAsTable(tableName)
}
//Now test
tableOverwrite(asDataFrame, partitionColumns, tableName)
spark.sqlContext.sql(s"select * from $tableName").show(1000)
tableOverwrite(asDataFrame, partitionColumns, tableName)
import spark.implicits._
val asLocalSet = spark.sqlContext.sql(s"select * from $tableName").as[DataExample].collect().toSet
if (asLocalSet == listData.toSet) {
println("Overwrite is working !!!")
}
}
使用 Scala 在 Spark 2.3.1 上对此进行了测试。
上面的大部分答案都是写入 Hive table。但是,我想直接写入 disk,它在此文件夹顶部有一个 external hive table
。
首先需要的配置
val sparkSession: SparkSession = SparkSession
.builder
.enableHiveSupport()
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder
.appName("spark_write_to_dynamic_partition_folders")
这里的用法:
DataFrame
.write
.format("<required file format>")
.partitionBy("<partitioned column name>")
.mode(SaveMode.Overwrite) // This is required.
.save(s"<path_to_root_folder>")
在 insertInto 语句中添加 'overwrite=True' 参数解决了这个问题:
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)
默认 overwrite=False
。将其更改为 True
允许我们覆盖 df
和 partioned_table 中包含的特定分区。这有助于我们避免用 df
.
覆盖 partioned_table 的全部内容
对于 >= Spark 2.3.0:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.insertInto("partitioned_table", overwrite=True)
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.toDF().write.mode("overwrite").format("parquet").partitionBy("date", "name").save("s3://path/to/somewhere")
这对我适用于 AWS Glue ETL 作业(Glue 1.0 - Spark 2.4 - Python 2)
我的解决方案意味着从 spark 数据帧开始覆盖每个特定分区。它跳过了删除分区部分。我正在使用 pyspark>=3 并在 AWS s3 上写作:
def write_df_on_s3(df, s3_path, field, mode):
# get the list of unique field values
list_partitions = [x.asDict()[field] for x in df.select(field).distinct().collect()]
df_repartitioned = df.repartition(1,field)
for p in list_partitions:
# create dataframes by partition and send it to s3
df_to_send = df_repartitioned.where("{}='{}'".format(field,p))
df_to_send.write.mode(mode).parquet(s3_path+"/"+field+"={}/".format(p))
这个简单函数的参数是 df、s3_path、分区字段和模式(覆盖或追加)。第一部分获取唯一字段值:这意味着如果我按天对 df 进行分区,我将获得 df 中所有日报的列表。然后我重新分区 df。最后,我每天选择重新分区的 df,并将其写入其特定的分区路径。
您可以根据需要更改重新分区整数。
我想覆盖特定的分区而不是所有的spark。我正在尝试以下命令:
df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
其中 df 是具有要覆盖的增量数据的数据帧。
hdfs-base-path 包含主数据。
当我尝试上述命令时,它会删除所有分区,并将存在于 df 中的分区插入到 hdfs 路径中。
我的要求是仅覆盖指定 hdfs 路径中存在于 df 中的那些分区。有人可以帮我解决这个问题吗?
这是一个常见问题。 Spark 到 2.0 的唯一解决方案是直接写入分区目录,例如
df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
如果您使用的是 2.0 之前的 Spark,则需要使用以下方法阻止 Spark 发出元数据文件(因为它们会破坏自动分区发现):
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
如果您使用的是 1.6.2 之前的 Spark,您还需要删除 /root/path/to/data/partition_col=value
中的 _SUCCESS
文件,否则它的存在会破坏自动分区发现。 (我强烈建议使用 1.6.2 或更高版本。)
您可以从我在 Bulletproof Jobs 上的 Spark 峰会演讲中获得有关如何管理大型分区表的更多详细信息。
使用 Spark 1.6...
HiveContext 可以大大简化这个过程。关键是您必须首先使用定义了分区的 CREATE EXTERNAL TABLE
语句在 Hive 中创建 table。例如:
# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'
从这里开始,假设您有一个 Dataframe,其中包含特定分区(或多个分区)的新记录。您可以使用 HiveContext SQL 语句使用此 Dataframe 执行 INSERT OVERWRITE
,这将仅覆盖 Dataframe 中包含的分区的 table:
# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')
hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
SELECT name, age
FROM update_dataframe""")
注意:此示例中的 update_dataframe
具有与目标 test
table.
使用这种方法容易犯的一个错误是跳过 Hive 中的 CREATE EXTERNAL TABLE
步骤,只使用 Dataframe API 的写入方法创建 table。特别是对于 Parquet-based tables,table 将不会被适当地定义以支持 Hive 的 INSERT OVERWRITE... PARTITION
函数。
希望这对您有所帮助。
如果您使用 DataFrame,可能您想对数据使用 Hive table。 在这种情况下,您只需要调用 method
df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)
它将覆盖 DataFrame 包含的分区。
没有必要指定格式 (orc),因为 Spark 将使用 Hive table 格式。
它在 Spark 1.6 版本中工作正常
你可以做这样的事情来使工作可重入(幂等): (在 spark 2.2 上试过)
# drop the partition
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition)
print drop_query
spark.sql(drop_query)
# delete directory
dbutils.fs.rm(<partition_directoy>,recurse=True)
# Load the partition
df.write\
.partitionBy("partition_col")\
.saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)
终于!这是 Spark 2.3.0 中的一项功能: SPARK-20236
使用需要将spark.sql.sources.partitionOverwriteMode
设置为动态,数据集需要分区,写入模式overwrite
。示例:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
我建议在写入之前根据您的分区列重新分区,这样您就不会以每个文件夹 400 个文件而告终。
在 Spark 2.3.0 之前,最好的解决方案是启动 SQL 语句删除这些分区,然后使用追加模式写入它们。
我建议您进行清理,然后使用 Append
模式写入新分区:
import scala.sys.process._
def deletePath(path: String): Unit = {
s"hdfs dfs -rm -r -skipTrash $path".!
}
df.select(partitionColumn).distinct.collect().foreach(p => {
val partition = p.getAs[String](partitionColumn)
deletePath(s"$path/$partitionColumn=$partition")
})
df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)
这只会删除新分区。写入数据后运行如果需要更新metastore请执行此命令:
sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")
注意: deletePath
假定 hfds
命令在您的系统上可用。
我尝试了以下方法来覆盖 HIVE 中的特定分区 table。
### load Data and check records
raw_df = spark.table("test.original")
raw_df.count()
lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925
### Check data in few partitions.
sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
print "Number of records: ", sample.count()
sample.show()
### Back-up the partitions before deletion
raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")
### UDF : To delete particular partition.
def delete_part(table, part):
qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
spark.sql(qry)
### Delete partitions
part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
part_list = part_df.rdd.map(lambda x : x[0]).collect()
table = "test.original"
for p in part_list:
delete_part(table, p)
### Do the required Changes to the columns in partitions
df = spark.table("test.original_bkp")
newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
newdf.select("c_customer_sk", "c_preferred_cust_flag").show()
### Write the Partitions back to Original table
newdf.write.insertInto("test.original")
### Verify data in Original table
orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()
Hope it helps.
Regards,
Neeraj
与其直接写入目标 table,我建议您创建一个类似于目标 table 的临时 table 并在其中插入您的数据。
CREATE TABLE tmpTbl LIKE trgtTbl LOCATION '<tmpLocation';
创建 table 后,您可以将数据写入 tmpLocation
df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)
然后您将通过执行以下命令恢复 table 分区路径:
MSCK REPAIR TABLE tmpTbl;
通过查询 Hive 元数据获取分区路径,例如:
SHOW PARTITONS tmpTbl;
从 trgtTbl
中删除这些分区并将目录从 tmpTbl
移动到 trgtTbl
正如 jatin 所写,您可以从配置单元和路径中删除分区,然后附加数据 因为我在上面浪费了太多时间,所以我为其他 spark 用户添加了以下示例。 我将 Scala 与 spark 2.2.1
一起使用 import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
case class DataExample(partition1: Int, partition2: String, someTest: String, id: Int)
object WhosebugExample extends App {
//Prepare spark & Data
val sparkConf = new SparkConf()
sparkConf.setMaster(s"local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val tableName = "my_table"
val partitions1 = List(1, 2)
val partitions2 = List("e1", "e2")
val partitionColumns = List("partition1", "partition2")
val myTablePath = "/tmp/some_example"
val someText = List("text1", "text2")
val ids = (0 until 5).toList
val listData = partitions1.flatMap(p1 => {
partitions2.flatMap(p2 => {
someText.flatMap(
text => {
ids.map(
id => DataExample(p1, p2, text, id)
)
}
)
}
)
})
val asDataFrame = spark.createDataFrame(listData)
//Delete path function
def deletePath(path: String, recursive: Boolean): Unit = {
val p = new Path(path)
val fs = p.getFileSystem(new Configuration())
fs.delete(p, recursive)
}
def tableOverwrite(df: DataFrame, partitions: List[String], path: String): Unit = {
if (spark.catalog.tableExists(tableName)) {
//clean partitions
val asColumns = partitions.map(c => new Column(c))
val relevantPartitions = df.select(asColumns: _*).distinct().collect()
val partitionToRemove = relevantPartitions.map(row => {
val fields = row.schema.fields
s"ALTER TABLE ${tableName} DROP IF EXISTS PARTITION " +
s"${fields.map(field => s"${field.name}='${row.getAs(field.name)}'").mkString("(", ",", ")")} PURGE"
})
val cleanFolders = relevantPartitions.map(partition => {
val fields = partition.schema.fields
path + fields.map(f => s"${f.name}=${partition.getAs(f.name)}").mkString("/")
})
println(s"Going to clean ${partitionToRemove.size} partitions")
partitionToRemove.foreach(partition => spark.sqlContext.sql(partition))
cleanFolders.foreach(partition => deletePath(partition, true))
}
asDataFrame.write
.options(Map("path" -> myTablePath))
.mode(SaveMode.Append)
.partitionBy(partitionColumns: _*)
.saveAsTable(tableName)
}
//Now test
tableOverwrite(asDataFrame, partitionColumns, tableName)
spark.sqlContext.sql(s"select * from $tableName").show(1000)
tableOverwrite(asDataFrame, partitionColumns, tableName)
import spark.implicits._
val asLocalSet = spark.sqlContext.sql(s"select * from $tableName").as[DataExample].collect().toSet
if (asLocalSet == listData.toSet) {
println("Overwrite is working !!!")
}
}
使用 Scala 在 Spark 2.3.1 上对此进行了测试。
上面的大部分答案都是写入 Hive table。但是,我想直接写入 disk,它在此文件夹顶部有一个 external hive table
。
首先需要的配置
val sparkSession: SparkSession = SparkSession
.builder
.enableHiveSupport()
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder
.appName("spark_write_to_dynamic_partition_folders")
这里的用法:
DataFrame
.write
.format("<required file format>")
.partitionBy("<partitioned column name>")
.mode(SaveMode.Overwrite) // This is required.
.save(s"<path_to_root_folder>")
在 insertInto 语句中添加 'overwrite=True' 参数解决了这个问题:
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)
默认 overwrite=False
。将其更改为 True
允许我们覆盖 df
和 partioned_table 中包含的特定分区。这有助于我们避免用 df
.
对于 >= Spark 2.3.0:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.insertInto("partitioned_table", overwrite=True)
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.toDF().write.mode("overwrite").format("parquet").partitionBy("date", "name").save("s3://path/to/somewhere")
这对我适用于 AWS Glue ETL 作业(Glue 1.0 - Spark 2.4 - Python 2)
我的解决方案意味着从 spark 数据帧开始覆盖每个特定分区。它跳过了删除分区部分。我正在使用 pyspark>=3 并在 AWS s3 上写作:
def write_df_on_s3(df, s3_path, field, mode):
# get the list of unique field values
list_partitions = [x.asDict()[field] for x in df.select(field).distinct().collect()]
df_repartitioned = df.repartition(1,field)
for p in list_partitions:
# create dataframes by partition and send it to s3
df_to_send = df_repartitioned.where("{}='{}'".format(field,p))
df_to_send.write.mode(mode).parquet(s3_path+"/"+field+"={}/".format(p))
这个简单函数的参数是 df、s3_path、分区字段和模式(覆盖或追加)。第一部分获取唯一字段值:这意味着如果我按天对 df 进行分区,我将获得 df 中所有日报的列表。然后我重新分区 df。最后,我每天选择重新分区的 df,并将其写入其特定的分区路径。
您可以根据需要更改重新分区整数。