读取特定文件和 merge/union 这些模式将文件演变为单个 Spark 数据帧

Read specific files and merge/union these schema evolutionized files into single Spark dataframe

我是 Spark 的新手,我正在尝试解决以下问题。
场景
我正在尝试读取多个镶木地板文件(以及 csv 文件,如果以后可能的话)并将它们加载到 Python 中的单个 spark 数据帧中,用于特定 范围的 日期,我稍后会说明选择日期的条件。
原因:架构演变 - 在 recent/latest 分区中添加了新列,因此无法合并或我不知道。如果有办法以 高效 的方式进行 Union,也请告诉我。
文件如下所示:

s3://dir1/dir2/dir3/files/partition_date=2020-12-25/
# Partitions do not exist for weekend days, i.e., Saturday and Sunday
s3://dir1/dir2/dir3/files/partition_date=2020-12-28/
s3://dir1/dir2/dir3/files/partition_date=2020-12-29/
s3://dir1/dir2/dir3/files/partition_date=2020-12-30/  # Consider this file with new columns in it
s3://dir1/dir2/dir3/files/partition_date=2020-12-31/  # Consider this file with new columns in it

Parquet(和 csv,用于不同的文件夹)驻留在每个文件中,如下所示:

s3://dir1/dir2/dir3/files/partition_date=2020-12-31/data_2020-12-31.parquet

在架构更改之前,我曾经加载文件夹 s3://dir1/dir2/dir3/files 中存在的 一切 (所有分区),然后使用以下方法将数据插入到单个 Spark 数据帧中:

spark_df = spark.read.format('parquet').load('s3://dir1/dir2/dir3/files')

但是现在,我只想从特定日期提取文件,因为特定日期范围内的文件由于缺少分区而无法使用。所以我使用 for 循环创建了 list 来检查存在哪些分区。此列表包含分区存在的所有日期的字符串。

dates = ['2020-12-25','2020-12-26','2020-12-27','2020-12-28','2020-12-29','2020-12-30','2020-12-31'] 
# I'll retrieve these dates by other efficient ways later on
existing_dates = []
# 'for' loop implementation
existing_dates = ['2020-12-25','2020-12-28','2020-12-29','2020-12-30','2020-12-31']

所以这是我的任务:

  1. 我想为 existing_dates
  2. 中存在的 日期提取数据
  3. 我还需要合并 分区与包含新附加列的新模式进化分区(即:本例中的 2020-12-30 和 2020-12-31)
  4. 我也需要检查 partition-parquet-file 是否为空! 我遇到了这个 ,但我不知道 Pyspark 中的等效代码。
  1. 您可以使用 {} 语法来读取特定分区。
base_path = 's3://dir1/dir2/dir3/files'

# Note 1: Extra {{ is to add literal {.
# Note 2: Reading by partitions removes the partition column (partition_date) in returned dataframe by default.
#         To keep the partition_date column, add basePath option to set your parquet data path.
df = (spark.read
      .option('basePath', base_path)
      .parquet(f'{base_path}/partition_date={{{",".join(existing_dates)}}}')

# f'{base_path}/partition_date={{{",".join(existing_dates)}}}' = 
# s3://dir1/dir2/dir3/files/partition_date={2020-12-25,2020-12-28,...}

仅供参考,其他语法 [] 可以进行范围捕获。

s3://dir1/dir2/dir3/files/partition_date=2020-12-2[5-8]

将捕获 2020-12-25、2020-12-26、2020-12-27、2020-12-28 的分区。

  1. 当您读取缺少列的分区和具有额外列的其他分区时,使用 mergeSchema 选项对齐所有列。
df = (spark.read
      .option('basePath', base_path)
      .option('mergeSchema', 'true') # this should handle missing columns
      .parquet(f'{base_path}/partition_date={{{",".join(existing_dates)}}}')
  1. 我还不确定这样做的目的是什么。确定后你想做什么?我问是因为取决于任务,您可能不需要识别空镶木地板。