按日期从 Spark 中的 S3 读取多个文件

Reading multiple files from S3 in Spark by date period

描述

我有一个应用程序,它将数据发送到 AWS Kinesis Firehose,然后将数据写入我的 S3 存储桶。 Firehose 使用 "yyyy/MM/dd/HH" 格式写入文件。

就像在这个示例 S3 路径中一样:

s3://mybucket/2016/07/29/12

现在我有一个用 Scala 编写的 Spark 应用程序,我需要在其中读取特定时间段的数据。我有开始日期和结束日期。数据采用 JSON 格式,这就是为什么我使用 sqlContext.read.json() 而不是 sc.textFile().

如何快速高效地读取数据?

我尝试了什么?

  1. 通配符 - 我可以select来自特定日期的所有时间或特定月份的所有日期的数据,例如:

    val df = sqlContext.read.json("s3://mybucket/2016/07/29/*")
    val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")
    

    但是如果我必须从几天的日期期间读取数据,例如 2016-07-29 - 2016-07-30,我不能以同样的方式使用通配符方法。

    这让我想到了下一点……

  2. 使用多个路径samthebestthis 解决方案中提供的目录 CSV。似乎用逗号分隔目录只适用于 sc.textFile() 而不是 sqlContext.read.json()
  3. Union - cloud 之前 link 的第二个解决方案建议读取每个目录分开,然后将它们结合在一起。尽管他建议合并 RDD-s,但也可以选择合并 DataFrame。如果我从给定的日期期间手动生成日期字符串,那么我可能会创建一个不存在的路径,而不是忽略它,整个读取失败。相反,我可以使用 AWS SDK 并使用 AmazonS3Client 中的函数 listObjects 来获取所有密钥,就像 iMKanchwala 之前 link.[=24 中的解决方案一样=]

    唯一的问题是我的数据在不断变化。如果 read.json() 函数将所有数据作为单个参数获取,它会读取所有必要的数据,并且足够聪明,可以从数据中推断出 json 模式。如果我分别读取 2 个目录并且它们的模式不匹配,那么我认为联合这两个数据帧会成为一个问题。

  4. Glob(?) syntax - nhahtdh 的解决方案比选项好一点12 因为它们提供了更详细地指定日期和目录的选项,并且作为单个 "path" 所以它也适用于read.json().

    但是,关于丢失的目录,又出现了一个熟悉的问题。假设我想要从 20.07 到 30.07 的所有数据,我可以这样声明:

    val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")
    

    但是如果我丢失了 7 月 25 日的数据,那么路径 ..16/07/25/ 不存在并且整个函数失败。

显然,当请求的时间段是 25.11.2015-12.02.2016 时,它会变得更加困难,然后我需要以编程方式(在我的 Scala 脚本中)创建一个字符串路径,如下所示:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"

并且通过创建它,我需要以某种方式确保这些 25-30 和 01-12 间隔都有相应的路径,如果缺少一个,它将再次失败。 (幸运的是,Asterisk 会处理丢失的目录,因为它会读取存在的所有内容)

如何一次性从单个目录路径读取所有必要的数据,而不会因为某个日期间隔之间的目录丢失而失败?

有一个更简单的解决方案。如果您查看 DataFrameReader API,您会注意到有一个 .json(paths: String*) 方法。只需构建一个你想要的路径的集合,而不是你喜欢的 globs,然后调用该方法,例如

val paths: Seq[String] = ...
val df = sqlContext.read.json(paths: _*)