pyspark:如何通过 year/month/day/hour 子目录写入数据帧分区?

pyspark : How to write dataframe partition by year/month/day/hour sub-directory?

我有制表符分隔的数据(csv 文件),如下所示:

201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...

我想按年、月、日、时分组写目录。

hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv

如何通过yyyy/mm/dd/hh写入分区?

分组列和过滤 Dataframe 对您有帮助。

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

sc = SparkSession.builder.appName("write_yyyy_mm_dd_hh_sample").getOrCreate()
df = sc.read.csv('hdfs://sample.csv', header=False, sep="\t");

partition_column = '_c0' # First column

# Extracts parts of a string(yyyymmddhh)
dir_yyyymmddhh = df[partition_column][1:10]

# Get unique yyyymmddhh values in the grouping column
groups = [x[0] for x in df.select(dir_yyyymmddhh).distinct().collect()]

# Create a filtered DataFrame
groups_list = [df.filter(F.col(partition_column)[1:10] == x) for x in groups]

# Save the result by yyyy/mm/dd/hh
for filtered_data in groups_list:
    target_date = filtered_data.select(partition_column).take(1)[0].asDict()[partition_column]
    # Extract each directory name
    dir_year = target_date[0:4]
    dir_month = target_date[4:6]
    dir_day = target_date[6:8]
    dir_hour = target_date[8:10]
    # Set destination directory by yyyy/mm/dd/hh
    partitioned_directory = 'hdfs://dest/' + dir_year +'/'+ str(dir_day) +'/'+ str(dir_hour) +'/'
    filtered_data.write.option("header", "false").option("sep","\t").csv(partitioned_directory)

结果:

hdfs://dest/2019/11/24/01/part-0000-xxxx.csv
hdfs://dest/2019/11/25/01/part-0000-xxxx.csv
hdfs://dest/2019/11/25/02/part-0000-xxxx.csv

DataFrameWriter 中已经有 partitionBy,它可以满足您的需要,而且更简单。此外,还有从时间戳中提取日期部分的功能。

这是您可以考虑的另一种解决方案。

由于您的 CSV 没有 header,您可以在加载它时应用自定义 header,这样以后操作列很容易:

custom_header = "timestamp\tvalue"
schema = StructType()
col_names = custom_header.split("\t")
for c in col_names:
    schema.add(StructField(c.strip(), StringType()))

df = spark.read.csv("hdfs://sample.csv", header=False, sep="\t", schema=schema)

现在,从列 timestamp 创建列 yearmonthdayhour,如下所示:

df_final = df.withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyyMMddHHmm')) \
           .withColumn("year", date_format(col("timestamp"), "yyyy")) \
           .withColumn("month", date_format(col("timestamp"), "MM")) \
           .withColumn("day", date_format(col("timestamp"), "dd")) \
           .withColumn("hour", date_format(col("timestamp"), "HH")) \
           .drop("timestamp")

df_final.show(truncate=False)

+-----+----+-----+---+----+
|value|year|month|day|hour|
+-----+----+-----+---+----+
|a    |2019|11   |24 |01  |
|b    |2019|11   |25 |01  |
|c    |2019|11   |25 |01  |
|z    |2019|11   |25 |02  |
|d    |2019|11   |25 |02  |
+-----+----+-----+---+----+

最后,使用 partitionBy 将 DF 写入目标路径,如下所示:

df_final.write.partitionBy("year", "month", "day", "hour") \
    .mode("overwrite") \
    .option("header", "false").option("sep", "\t") \
    .csv("hdfs://dest/")

分区将在 /dest/ 文件夹下创建。