如何在一次加载中导入多个 csv 文件?

How to import multiple csv files in a single load?

假设我有一个已定义的架构,用于在一个文件夹中加载 10 个 csv 文件。有没有办法使用 Spark SQL 自动加载表。我知道这可以通过为每个文件使用单独的数据框来执行 [如下所示],但是它可以通过单个命令自动执行而不是指向文件我可以指向文件夹吗?

df = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("header", "true")
       .load("../Downloads/2008.csv")

使用通配符,例如将 2008 替换为 *:

df = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("header", "true")
       .load("../Downloads/*.csv") // <-- note the star (*)

Spark 2.0

// these lines are equivalent in Spark 2.0
spark.read.format("csv").option("header", "true").load("../Downloads/*.csv")
spark.read.option("header", "true").csv("../Downloads/*.csv")

备注:

  1. 使用format("csv")csv方法代替format("com.databricks.spark.csv")com.databricks.spark.csv 格式已集成到 2.0.

  2. 使用spark而不是sqlContext

Reader 的摘要:(Spark 2.x)

例如,如果您有 3 个目录保存 csv 文件:

dir1, dir2, dir3

然后您将 paths 定义为逗号分隔的路径列表字符串,如下所示:

paths = "dir1/,dir2/,dir3/*"

然后使用下面的函数并将这个paths变量传递给它

def get_df_from_csv_paths(paths):

        df = spark.read.format("csv").option("header", "false").\
            schema(custom_schema).\
            option('delimiter', '\t').\
            option('mode', 'DROPMALFORMED').\
            load(paths.split(','))
        return df

届时运行:

df = get_df_from_csv_paths(paths)

您将在 df 中获得一个单独的 spark 数据帧,其中包含来自这 3 个目录中找到的所有 csvs 的数据。

======================================== ===================================

完整版:

如果您想从多个目录中摄取多个 CSV,您只需传递一个列表并使用通配符。

例如:

如果您的 data_path 看起来像这样:

's3://bucket_name/subbucket_name/2016-09-*/184/*,
s3://bucket_name/subbucket_name/2016-10-*/184/*,
s3://bucket_name/subbucket_name/2016-11-*/184/*,
s3://bucket_name/subbucket_name/2016-12-*/184/*, ... '

您可以使用上述功能一次提取所有这些目录和子目录中的所有 csv:

这将根据指定的通配符模式获取 s3 bucket_name/subbucket_name/ 中的所有目录。例如第一个模式会在

中查找

bucket_name/subbucket_name/

对于名称以

开头的所有目录

2016-09-

并且对于每一个只取名为

的目录

184

并在该子目录中查找所有 csv 文件。

这将针对逗号分隔列表中的每个模式执行。

这比 union 更好..

使用 Spark 2.0+,我们可以使用不同的目录加载多个 CSV 文件 df = spark.read.csv(['directory_1','directory_2','directory_3'.....], header=True)。有关详细信息,请参阅文档 here

请注意,您可以使用其他技巧,例如:

-- One or more wildcard:
       .../Downloads20*/*.csv
--  braces and brackets   
       .../Downloads201[1-5]/book.csv
       .../Downloads201{11,15,19,99}/book.csv

Ex1:

正在读取单个 CSV 文件。提供完整的文件路径:

 val df = spark.read.option("header", "true").csv("C:spark\sample_data\tmp\cars1.csv")

Ex2:

读取多个传递名称的 CSV 文件:

val df=spark.read.option("header","true").csv("C:spark\sample_data\tmp\cars1.csv", "C:spark\sample_data\tmp\cars2.csv")

Ex3:

读取多个传递名称列表的 CSV 文件:

val paths = List("C:spark\sample_data\tmp\cars1.csv", "C:spark\sample_data\tmp\cars2.csv")
val df = spark.read.option("header", "true").csv(paths: _*)

Ex4:

读取文件夹中的多个 CSV 文件而忽略其他文件:

val df = spark.read.option("header", "true").csv("C:spark\sample_data\tmp\*.csv")

Ex5:

从多个文件夹读取多个 CSV 文件:

val folders = List("C:spark\sample_data\tmp", "C:spark\sample_data\tmp1")
val df = spark.read.option("header", "true").csv(folders: _*)
val df = spark.read.option("header", "true").csv("C:spark\sample_data\*.csv)

将考虑文件 tmp、tmp1、tmp2、....