如何从 Apache Spark 访问 s3a:// 文件?

How to access s3a:// files from Apache Spark?

Hadoop 2.6 不支持开箱即用的 s3a,因此我尝试了一系列解决方案和修复,包括:

使用 hadoop-aws 和 aws-java-sdk 部署 => 无法读取凭据的环境变量 将 hadoop-aws 添加到 maven => 各种传递依赖冲突

有没有人成功地使两者都起作用?

使用 Hadoop 2.6 预构建的 Spark 1.4.1,通过添加 hadoop-aws 和 aws-java- 部署到 Spark Standalone 集群时,我能够使 s3a:// 正常工作sdk jar 文件从 Hadoop 2.7.1 发行版(在 Hadoop 2.7.1 的 $HADOOP_HOME/share/hadoop/tools/lib 下找到)到我的 $SPARK_HOME/conf/spark-env.sh 文件中的 SPARK_CLASSPATH 环境变量。

我使用带有 hadoop 2.6 的 Spark 1.4.1 预构建二进制文件让它工作 确保将 spark.driver.extraClassPathspark.executor.extraClassPath 都设置为指向两个 jar(hadoop-aws 和 aws-java-sdk) 如果您 运行 在集群上,请确保您的执行程序可以访问集群上的 jar 文件。

亲身体验过 s3a 和 s3n 之间的差异 - 在 s3a 上传输 7.9GB 数据大约需要 7 分钟,而在 s3n 上传输 7.9GB 数据需要 73 分钟 [us-east-1 到 us-west-1 不幸的是,在这两种情况下; Redshift 和 Lambda 目前 us-east-1] 这是堆栈中非常重要的部分,需要正确处理,值得受挫。

以下是截至 2015 年 12 月的关键部分:

  1. 您的 Spark 集群需要 Hadoop 版本 2.x 或更高版本。如果您使用 Spark EC2 安装脚本并且可能错过了它,则使用 1.0 以外的东西的开关是指定 --hadoop-major-version 2(在撰写本文时使用 CDH 4.2)。

  2. 您需要为 Hadoop 最新版本 2.7.1 添加初看起来似乎已过时的 AWS SDK 库(2014 年构建,版本为 1.7.4) (稳定):aws-java-sdk 1.7.4。据我所知,将它与 1.10.8 的特定 AWS SDK JAR 一起使用并没有破坏任何东西。

  3. 您还需要 class 路径上的 hadoop-aws 2.7.1 JAR。此 JAR 包含 class org.apache.hadoop.fs.s3a.S3AFileSystem.

  4. spark.properties 中,您可能需要一些如下所示的设置:

    spark.hadoop.fs.s3a.access.key=ACCESSKEY spark.hadoop.fs.s3a.secret.key=SECRETKEY

  5. 如果您使用带有 spark 的 hadoop 2.7 版本,则 aws 客户端使用 V2 作为默认身份验证签名。并且所有新的 aws 区域仅支持 V4 协议。要使用 V4,请在 spark-submit 中传递这些 conf,并且还必须指定端点(格式 - s3.<region>.amazonaws.com)。

--conf "spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true

--conf "spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true

我在 post I wrote 上详细介绍了这个列表,因为我已经完成了这个过程。此外,我还介绍了一路上我遇到的所有异常情况以及我认为是每个异常情况的原因以及如何解决它们。

我们将 spark 1.6.1 与 Mesos 一起使用,我们在从 spark 写入 S3 时遇到了很多问题。我感谢 cfeduke 的回答。我所做的细微改动是将 Maven 坐标添加到 spark-defaults.conf 文件中的 spark.jar 配置。我尝试使用 hadoop-aws:2.7.2,但仍然出现很多错误,所以我们回到了 2.7.1。以下是对我们有用的 spark-defaults.conf 的变化:

spark.jars.packages             net.java.dev.jets3t:jets3t:0.9.0,com.google.guava:guava:16.0.1,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1
spark.hadoop.fs.s3a.access.key  <MY ACCESS KEY>
spark.hadoop.fs.s3a.secret.key  <MY SECRET KEY>
spark.hadoop.fs.s3a.fast.upload true

感谢 cfeduke 花时间写下您的 post。很有帮助。

如你所说,hadoop 2.6不支持s3a,最新的spark release 1.6.1也不支持hadoop 2.7,但是spark 2.0对hadoop 2.7和s3a肯定没问题。

对于 spark 1.6.x,我们使用来自 EMR 的 s3 驱动程序做了一些肮脏的 hack...你可以看看这个文档:https://github.com/zalando/spark-appliance#emrfs-support

如果你还想尝试在spark 1.6.x中使用s3a,请参考这里的答案:

以下是截至 2016 年 10 月在 Spark 欧盟峰会上介绍的详细信息:Apache Spark and Object Stores

要点

  • 由于 risk/experience 数据损坏,直接输出提交者已从 Spark 2.0 中消失。
  • FileOutputCommitter 上有一些设置可以减少重命名,但不能消除重命名
  • 我正在与一些同事一起做一个 O(1) 提交者,依靠 Apache Dynamo 为我们提供所需的一致性。
  • 要使用 S3a,请正确设置类路径。
  • 并在 Hadoop 2 上。7.z; 2.6.x 有一些问题已在 HADOOP-11571 时解决。
  • 在 SPARK-7481 下有一个 PR,可以将所有内容都放入您自己构建的 spark 发行版中。否则,请提供二进制文件的人来完成这项工作。
  • Hadoop 2.8 将添加主要性能改进 HADOOP-11694

产品植入:HADOOP-11694 的读取性能方面包含在 HDP2.5 中; Spark and S3 documentation 可能会有兴趣 — 特别是调整选项。

我写这个答案是为了使用 S3AHadoop 2.7 上的 Spark 2.0.1 访问文件。 3

复制 Hadoop 默认附带的 AWS jar(hadoop-aws-2.7.3.jaraws-java-sdk-1.7.4.jar

  • 提示:如果 jar 位置不确定? 运行 以特权用户身份查找命令会有所帮助;命令可以是

      find / -name hadoop-aws*.jar
      find / -name aws-java-sdk*.jar
    

进入 spark 类路径,其中包含所有 spark 罐

  • 提示: 我们不能直接指向位置(它必须在 属性 文件中)因为我想为分布和 Linux 口味。 spark classpath可以通过下面的find命令识别

      find / -name spark-core*.jar
    

spark-defaults.conf

提示:(多半会放在/etc/spark/conf/spark-defaults.conf

#make sure jars are added to CLASSPATH
spark.yarn.jars=file://{spark/home/dir}/jars/*.jar,file://{hadoop/install/dir}/share/hadoop/tools/lib/*.jar


spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem  
spark.hadoop.fs.s3a.access.key={s3a.access.key} 
spark.hadoop.fs.s3a.secret.key={s3a.secret.key} 
#you can set above 3 properties in hadoop level `core-site.xml` as well by removing spark prefix.

如果需要,在 spark 提交中将 jars(aws-java-sdkhadoop-aws)包含在 --driver-class-path 中。

spark-submit --master yarn \
  --driver-class-path {spark/jars/home/dir}/aws-java-sdk-1.7.4.jar \
  --driver-class-path {spark/jars/home/dir}/hadoop-aws-2.7.3.jar \
  other options

Note:

Make sure the Linux user with reading privileges, before running the find command to prevent error Permission denied

您还可以使用 spark-defaults.conf 将 S3A 依赖项添加到类路径中。

示例:

spark.driver.extraClassPath     /usr/local/spark/jars/hadoop-aws-2.7.5.jar
spark.executor.extraClassPath   /usr/local/spark/jars/hadoop-aws-2.7.5.jar
spark.driver.extraClassPath     /usr/local/spark/jars/aws-java-sdk-1.7.4.jar
spark.executor.extraClassPath   /usr/local/spark/jars/aws-java-sdk-1.7.4.jar

或者只是:

spark.jars     /usr/local/spark/jars/hadoop-aws-2.7.5.jar,/usr/local/spark/jars/aws-java-sdk-1.7.4.jar

只需确保将您的 AWS SDK 版本与 Hadoop 版本匹配即可。有关此的更多信息,请查看此答案:Unable to access S3 data using Spark 2.2

这是pyspark的解决方案(可能有代理):

def _configure_s3_protocol(spark, proxy=props["proxy"]["host"], port=props["proxy"]["port"], endpoint=props["s3endpoint"]["irland"]):
    """
    Configure access to the protocol s3
    https://sparkour.urizone.net/recipes/using-s3/
    AWS Regions and Endpoints
    https://docs.aws.amazon.com/general/latest/gr/rande.html
    """
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",  os.environ.get("AWS_ACCESS_KEY_ID"))
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ.get("AWS_SECRET_ACCESS_KEY"))
    sc._jsc.hadoopConfiguration().set("fs.s3a.proxy.host", proxy)
    sc._jsc.hadoopConfiguration().set("fs.s3a.proxy.port", port)
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", endpoint)
    return spark

我正在使用 spark 2.3 版,当我使用 spark 保存数据集时,如下所示:

dataset.write().format("hive").option("fileFormat", "orc").mode(SaveMode.Overwrite)
    .option("path", "s3://reporting/default/temp/job_application")
    .saveAsTable("job_application");

它完美地工作并将我的数据保存到 s3 中。

这是一个 scala 版本,可以与 Spark 3.2.1 (pre-built) 和 Hadoop 3.3.1,从非 AWS 机器访问 S3 存储桶[通常是开发人员机器上的本地设置]

sbt

 libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "3.2.1" % "provided",
    "org.apache.spark" %% "spark-streaming" % "3.2.1" % "provided",
    "org.apache.spark" %% "spark-sql" % "3.2.1" % "provided",
    "org.apache.hadoop" % "hadoop-aws" % "3.3.1",
    "org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided"
  )

spark 程序

  val spark = SparkSession
    .builder()
    .master("local")
    .appName("Process parquet file")
    .config("spark.hadoop.fs.s3a.path.style.access", true)
    .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
    .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT)
    .config(
      "spark.hadoop.fs.s3a.impl",
      "org.apache.hadoop.fs.s3a.S3AFileSystem"
    )
    // The enable V4 does not seem necessary for the eu-west-3 region
    // see @stevel comment below 
    // .config("com.amazonaws.services.s3.enableV4", true)
    // .config(
    //  "spark.driver.extraJavaOptions",
    //  "-Dcom.amazonaws.services.s3.enableV4=true"
    // )
    .config("spark.executor.instances", "4")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val df = spark.read.parquet("s3a://[BUCKET NAME]/.../???.parquet")
  df.show()

注意:区域的形式是s3.[REGION].amazonaws.com,例如s3.eu-west-3.amazonaws.com

s3 配置

要使存储桶可从 AWS 外部使用,请添加以下形式的存储桶策略:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::[ACCOUNT ID]:user/[IAM USERNAME]"
            },
            "Action": [
                "s3:Delete*",
                "s3:Get*",
                "s3:List*",
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::[BUCKET NAME]/*"
        }
    ]
}

提供给 spark 配置的 ACCESS_KEY 和 SECRET_KEY 必须是存储桶上配置的 IAM 用户