我可以将多个文件从 S3 读取到 Spark Dataframe 中,而忽略不存在的文件吗?

Can I read multiple files into a Spark Dataframe from S3, passing over nonexistent ones?

我想从 S3 将多个 parquet 文件读入数据帧。目前,我正在使用以下方法来执行此操作:

files = ['s3a://dev/2017/01/03/data.parquet',
         's3a://dev/2017/01/02/data.parquet']
df = session.read.parquet(*files)

如果所有文件都存在于 S3 上,则此方法有效,但我想请求将文件列表加载到数据帧中,而不会在列表中的某些文件不存在时中断。换句话说,我希望 sparkSql 加载它在数据框中找到的尽可能多的文件,并且 return 这个结果没有抱怨。这可能吗?

是的,如果您将指定输入的方法更改为 hadoop glob 模式,例如:

files = 's3a://dev/2017/01/{02,03}/data.parquet'
df = session.read.parquet(files)

您可以在 Hadoop javadoc 中阅读有关模式的更多信息。

但是,在我看来,这不是处理按时间(在您的情况下是按天)分区的数据的优雅方式。如果您能够像这样重命名目录:

  • s3a://dev/2017/01/03/data.parquet --> s3a://dev/day=2017-01-03/data.parquet
  • s3a://dev/2017/01/02/data.parquet --> s3a://dev/day=2017-01-02/data.parquet

然后您可以利用 spark partitioning 架构并通过以下方式读取数据:

session.read.parquet('s3a://dev/') \
    .where(col('day').between('2017-01-02', '2017-01-03')

这种方式也会省略 empty/non-existing 目录。其他列 day 将出现在您的数据框中(它将是 spark <2.1.0 中的字符串和 spark >= 2.1.0 中的日期时间),因此您将知道每个记录存在于哪个目录中。

我可以观察到,由于 glob-pattern 匹配包括路径的完整递归 tree-walk 和模式匹配,它绝对是对象存储的性能杀手,尤其是 S3。 spark 中有一个特殊的快捷方式来识别你的路径何时没有任何 glob 字符,在这种情况下它会做出更有效的选择。

同样,非常深的分区树,如 year/month/day 布局,意味着扫描许多目录,每个目录的成本为数百毫秒(或更糟)。

Mariusz 建议的布局应该更高效,因为它是一个更扁平的目录树 — 切换到它应该对对象存储的性能产生比实际文件系统更大的影响。

使用union

的解决方案
files = ['s3a://dev/2017/01/03/data.parquet',
         's3a://dev/2017/01/02/data.parquet']

for i, file in enumerate(files):
    act_df = spark.read.parquet(file)   
    if i == 0:
        df = act_df
    else:
        df = df.union(act_df)

一个优点是不管什么模式都可以做到。

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job

import boto3


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


inputDyf = lueContext.create_dynamic_frame.from_options(connection_type="parquet", connection_options={'paths': ["s3://dev-test-laxman-new-bucket/"]})

我能够从 s3://dev-test-laxman-new-bucket/ 读取多个 (2) parquet 文件并写入 csv 文件。

如您所见,我的存储桶中有 2 个 parqet 文件:

希望对其他人有所帮助。