pyspark:如何通过 year/month/day/hour 子目录写入数据帧分区?
pyspark : How to write dataframe partition by year/month/day/hour sub-directory?
我有制表符分隔的数据(csv 文件),如下所示:
201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...
我想按年、月、日、时分组写目录。
hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv
如何通过yyyy/mm/dd/hh写入分区?
分组列和过滤 Dataframe 对您有帮助。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
sc = SparkSession.builder.appName("write_yyyy_mm_dd_hh_sample").getOrCreate()
df = sc.read.csv('hdfs://sample.csv', header=False, sep="\t");
partition_column = '_c0' # First column
# Extracts parts of a string(yyyymmddhh)
dir_yyyymmddhh = df[partition_column][1:10]
# Get unique yyyymmddhh values in the grouping column
groups = [x[0] for x in df.select(dir_yyyymmddhh).distinct().collect()]
# Create a filtered DataFrame
groups_list = [df.filter(F.col(partition_column)[1:10] == x) for x in groups]
# Save the result by yyyy/mm/dd/hh
for filtered_data in groups_list:
target_date = filtered_data.select(partition_column).take(1)[0].asDict()[partition_column]
# Extract each directory name
dir_year = target_date[0:4]
dir_month = target_date[4:6]
dir_day = target_date[6:8]
dir_hour = target_date[8:10]
# Set destination directory by yyyy/mm/dd/hh
partitioned_directory = 'hdfs://dest/' + dir_year +'/'+ str(dir_day) +'/'+ str(dir_hour) +'/'
filtered_data.write.option("header", "false").option("sep","\t").csv(partitioned_directory)
结果:
hdfs://dest/2019/11/24/01/part-0000-xxxx.csv
hdfs://dest/2019/11/25/01/part-0000-xxxx.csv
hdfs://dest/2019/11/25/02/part-0000-xxxx.csv
DataFrameWriter 中已经有 partitionBy
,它可以满足您的需要,而且更简单。此外,还有从时间戳中提取日期部分的功能。
这是您可以考虑的另一种解决方案。
由于您的 CSV 没有 header,您可以在加载它时应用自定义 header,这样以后操作列很容易:
custom_header = "timestamp\tvalue"
schema = StructType()
col_names = custom_header.split("\t")
for c in col_names:
schema.add(StructField(c.strip(), StringType()))
df = spark.read.csv("hdfs://sample.csv", header=False, sep="\t", schema=schema)
现在,从列 timestamp
创建列 year
、month
、day
、hour
,如下所示:
df_final = df.withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyyMMddHHmm')) \
.withColumn("year", date_format(col("timestamp"), "yyyy")) \
.withColumn("month", date_format(col("timestamp"), "MM")) \
.withColumn("day", date_format(col("timestamp"), "dd")) \
.withColumn("hour", date_format(col("timestamp"), "HH")) \
.drop("timestamp")
df_final.show(truncate=False)
+-----+----+-----+---+----+
|value|year|month|day|hour|
+-----+----+-----+---+----+
|a |2019|11 |24 |01 |
|b |2019|11 |25 |01 |
|c |2019|11 |25 |01 |
|z |2019|11 |25 |02 |
|d |2019|11 |25 |02 |
+-----+----+-----+---+----+
最后,使用 partitionBy
将 DF 写入目标路径,如下所示:
df_final.write.partitionBy("year", "month", "day", "hour") \
.mode("overwrite") \
.option("header", "false").option("sep", "\t") \
.csv("hdfs://dest/")
分区将在 /dest/
文件夹下创建。
我有制表符分隔的数据(csv 文件),如下所示:
201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...
我想按年、月、日、时分组写目录。
hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv
如何通过yyyy/mm/dd/hh写入分区?
分组列和过滤 Dataframe 对您有帮助。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
sc = SparkSession.builder.appName("write_yyyy_mm_dd_hh_sample").getOrCreate()
df = sc.read.csv('hdfs://sample.csv', header=False, sep="\t");
partition_column = '_c0' # First column
# Extracts parts of a string(yyyymmddhh)
dir_yyyymmddhh = df[partition_column][1:10]
# Get unique yyyymmddhh values in the grouping column
groups = [x[0] for x in df.select(dir_yyyymmddhh).distinct().collect()]
# Create a filtered DataFrame
groups_list = [df.filter(F.col(partition_column)[1:10] == x) for x in groups]
# Save the result by yyyy/mm/dd/hh
for filtered_data in groups_list:
target_date = filtered_data.select(partition_column).take(1)[0].asDict()[partition_column]
# Extract each directory name
dir_year = target_date[0:4]
dir_month = target_date[4:6]
dir_day = target_date[6:8]
dir_hour = target_date[8:10]
# Set destination directory by yyyy/mm/dd/hh
partitioned_directory = 'hdfs://dest/' + dir_year +'/'+ str(dir_day) +'/'+ str(dir_hour) +'/'
filtered_data.write.option("header", "false").option("sep","\t").csv(partitioned_directory)
结果:
hdfs://dest/2019/11/24/01/part-0000-xxxx.csv
hdfs://dest/2019/11/25/01/part-0000-xxxx.csv
hdfs://dest/2019/11/25/02/part-0000-xxxx.csv
DataFrameWriter 中已经有 partitionBy
,它可以满足您的需要,而且更简单。此外,还有从时间戳中提取日期部分的功能。
这是您可以考虑的另一种解决方案。
由于您的 CSV 没有 header,您可以在加载它时应用自定义 header,这样以后操作列很容易:
custom_header = "timestamp\tvalue"
schema = StructType()
col_names = custom_header.split("\t")
for c in col_names:
schema.add(StructField(c.strip(), StringType()))
df = spark.read.csv("hdfs://sample.csv", header=False, sep="\t", schema=schema)
现在,从列 timestamp
创建列 year
、month
、day
、hour
,如下所示:
df_final = df.withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyyMMddHHmm')) \
.withColumn("year", date_format(col("timestamp"), "yyyy")) \
.withColumn("month", date_format(col("timestamp"), "MM")) \
.withColumn("day", date_format(col("timestamp"), "dd")) \
.withColumn("hour", date_format(col("timestamp"), "HH")) \
.drop("timestamp")
df_final.show(truncate=False)
+-----+----+-----+---+----+
|value|year|month|day|hour|
+-----+----+-----+---+----+
|a |2019|11 |24 |01 |
|b |2019|11 |25 |01 |
|c |2019|11 |25 |01 |
|z |2019|11 |25 |02 |
|d |2019|11 |25 |02 |
+-----+----+-----+---+----+
最后,使用 partitionBy
将 DF 写入目标路径,如下所示:
df_final.write.partitionBy("year", "month", "day", "hour") \
.mode("overwrite") \
.option("header", "false").option("sep", "\t") \
.csv("hdfs://dest/")
分区将在 /dest/
文件夹下创建。