PySpark 从按文件名分区的目录写入镶木地板
PySpark write parquet from directory partitionned by file name
在 HDFS 中,我在这个文件夹中有一个文件夹 /landing/my_data/flow_name/
我有一堆 .csv
个文件:
- /landing/my_data/children_flow/20212503_1_children.csv
- /landing/my_data/children_flow/20212503_2_children.csv
- /landing/my_data/children_flow/20212503_3_children.csv
我想使用 /landing/my_data/children_flow/
目录创建一个按文件名分区的镶木地板文件,并将其放入 /staging/my_data/children_flow/
.
我希望 /staging/my_data/children_flow/
看起来像这样 :
- /staging/my_data/children_flow/file_name=20212503_1_children/*.parquet
- /staging/my_data/children_flow/file_name=20212503_2_children/*.parquet
- /staging/my_data/children_flow/file_name=20212503_3_children/*.parquet
我考虑过将 /landing/my_data/children_flow
中的每个文件读入 DataFrame,然后将文件名添加为一列,最后创建我的 parquet 文件,该文件将按 file_name
列进行分区。
children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.read.csv(os.path.join("/landing/my_data/children_flow/", children_files[0]))
df = df.withColumn("file_name", lit(children_files[0].replace(".csv", "")))
children_files.pop(0)
for one_file in children_files :
df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
但我不确定这是最好的解决方案...任何人都可以帮我解决这个问题吗?
我这样做了:
children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.createDataFrame(
spark.sparkContext.emptyRDD(),
dfSchema
)
df = df.withColumn("file_name", lit(None))
for one_file in children_files :
df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
我觉得使用 emptyRDD()
更好。
在 HDFS 中,我在这个文件夹中有一个文件夹 /landing/my_data/flow_name/
我有一堆 .csv
个文件:
- /landing/my_data/children_flow/20212503_1_children.csv
- /landing/my_data/children_flow/20212503_2_children.csv
- /landing/my_data/children_flow/20212503_3_children.csv
我想使用 /landing/my_data/children_flow/
目录创建一个按文件名分区的镶木地板文件,并将其放入 /staging/my_data/children_flow/
.
我希望 /staging/my_data/children_flow/
看起来像这样 :
- /staging/my_data/children_flow/file_name=20212503_1_children/*.parquet
- /staging/my_data/children_flow/file_name=20212503_2_children/*.parquet
- /staging/my_data/children_flow/file_name=20212503_3_children/*.parquet
我考虑过将 /landing/my_data/children_flow
中的每个文件读入 DataFrame,然后将文件名添加为一列,最后创建我的 parquet 文件,该文件将按 file_name
列进行分区。
children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.read.csv(os.path.join("/landing/my_data/children_flow/", children_files[0]))
df = df.withColumn("file_name", lit(children_files[0].replace(".csv", "")))
children_files.pop(0)
for one_file in children_files :
df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
但我不确定这是最好的解决方案...任何人都可以帮我解决这个问题吗?
我这样做了:
children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.createDataFrame(
spark.sparkContext.emptyRDD(),
dfSchema
)
df = df.withColumn("file_name", lit(None))
for one_file in children_files :
df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
我觉得使用 emptyRDD()
更好。