我可以将多个文件从 S3 读取到 Spark Dataframe 中,而忽略不存在的文件吗?
Can I read multiple files into a Spark Dataframe from S3, passing over nonexistent ones?
我想从 S3 将多个 parquet 文件读入数据帧。目前,我正在使用以下方法来执行此操作:
files = ['s3a://dev/2017/01/03/data.parquet',
's3a://dev/2017/01/02/data.parquet']
df = session.read.parquet(*files)
如果所有文件都存在于 S3 上,则此方法有效,但我想请求将文件列表加载到数据帧中,而不会在列表中的某些文件不存在时中断。换句话说,我希望 sparkSql 加载它在数据框中找到的尽可能多的文件,并且 return 这个结果没有抱怨。这可能吗?
是的,如果您将指定输入的方法更改为 hadoop glob 模式,例如:
files = 's3a://dev/2017/01/{02,03}/data.parquet'
df = session.read.parquet(files)
您可以在 Hadoop javadoc 中阅读有关模式的更多信息。
但是,在我看来,这不是处理按时间(在您的情况下是按天)分区的数据的优雅方式。如果您能够像这样重命名目录:
s3a://dev/2017/01/03/data.parquet
--> s3a://dev/day=2017-01-03/data.parquet
s3a://dev/2017/01/02/data.parquet
--> s3a://dev/day=2017-01-02/data.parquet
然后您可以利用 spark partitioning 架构并通过以下方式读取数据:
session.read.parquet('s3a://dev/') \
.where(col('day').between('2017-01-02', '2017-01-03')
这种方式也会省略 empty/non-existing 目录。其他列 day
将出现在您的数据框中(它将是 spark <2.1.0 中的字符串和 spark >= 2.1.0 中的日期时间),因此您将知道每个记录存在于哪个目录中。
我可以观察到,由于 glob-pattern 匹配包括路径的完整递归 tree-walk 和模式匹配,它绝对是对象存储的性能杀手,尤其是 S3。 spark 中有一个特殊的快捷方式来识别你的路径何时没有任何 glob 字符,在这种情况下它会做出更有效的选择。
同样,非常深的分区树,如 year/month/day 布局,意味着扫描许多目录,每个目录的成本为数百毫秒(或更糟)。
Mariusz 建议的布局应该更高效,因为它是一个更扁平的目录树 — 切换到它应该对对象存储的性能产生比实际文件系统更大的影响。
使用union
的解决方案
files = ['s3a://dev/2017/01/03/data.parquet',
's3a://dev/2017/01/02/data.parquet']
for i, file in enumerate(files):
act_df = spark.read.parquet(file)
if i == 0:
df = act_df
else:
df = df.union(act_df)
一个优点是不管什么模式都可以做到。
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
import boto3
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
inputDyf = lueContext.create_dynamic_frame.from_options(connection_type="parquet", connection_options={'paths': ["s3://dev-test-laxman-new-bucket/"]})
我能够从 s3://dev-test-laxman-new-bucket/
读取多个 (2) parquet 文件并写入 csv 文件。
如您所见,我的存储桶中有 2 个 parqet 文件:
希望对其他人有所帮助。
我想从 S3 将多个 parquet 文件读入数据帧。目前,我正在使用以下方法来执行此操作:
files = ['s3a://dev/2017/01/03/data.parquet',
's3a://dev/2017/01/02/data.parquet']
df = session.read.parquet(*files)
如果所有文件都存在于 S3 上,则此方法有效,但我想请求将文件列表加载到数据帧中,而不会在列表中的某些文件不存在时中断。换句话说,我希望 sparkSql 加载它在数据框中找到的尽可能多的文件,并且 return 这个结果没有抱怨。这可能吗?
是的,如果您将指定输入的方法更改为 hadoop glob 模式,例如:
files = 's3a://dev/2017/01/{02,03}/data.parquet'
df = session.read.parquet(files)
您可以在 Hadoop javadoc 中阅读有关模式的更多信息。
但是,在我看来,这不是处理按时间(在您的情况下是按天)分区的数据的优雅方式。如果您能够像这样重命名目录:
s3a://dev/2017/01/03/data.parquet
-->s3a://dev/day=2017-01-03/data.parquet
s3a://dev/2017/01/02/data.parquet
-->s3a://dev/day=2017-01-02/data.parquet
然后您可以利用 spark partitioning 架构并通过以下方式读取数据:
session.read.parquet('s3a://dev/') \
.where(col('day').between('2017-01-02', '2017-01-03')
这种方式也会省略 empty/non-existing 目录。其他列 day
将出现在您的数据框中(它将是 spark <2.1.0 中的字符串和 spark >= 2.1.0 中的日期时间),因此您将知道每个记录存在于哪个目录中。
我可以观察到,由于 glob-pattern 匹配包括路径的完整递归 tree-walk 和模式匹配,它绝对是对象存储的性能杀手,尤其是 S3。 spark 中有一个特殊的快捷方式来识别你的路径何时没有任何 glob 字符,在这种情况下它会做出更有效的选择。
同样,非常深的分区树,如 year/month/day 布局,意味着扫描许多目录,每个目录的成本为数百毫秒(或更糟)。
Mariusz 建议的布局应该更高效,因为它是一个更扁平的目录树 — 切换到它应该对对象存储的性能产生比实际文件系统更大的影响。
使用union
files = ['s3a://dev/2017/01/03/data.parquet',
's3a://dev/2017/01/02/data.parquet']
for i, file in enumerate(files):
act_df = spark.read.parquet(file)
if i == 0:
df = act_df
else:
df = df.union(act_df)
一个优点是不管什么模式都可以做到。
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
import boto3
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
inputDyf = lueContext.create_dynamic_frame.from_options(connection_type="parquet", connection_options={'paths': ["s3://dev-test-laxman-new-bucket/"]})
我能够从 s3://dev-test-laxman-new-bucket/
读取多个 (2) parquet 文件并写入 csv 文件。
如您所见,我的存储桶中有 2 个 parqet 文件:
希望对其他人有所帮助。