无法使用本地 PySpark 从 S3 读取 json 文件

Unable to read json file from S3 using local PySpark

我正在尝试在本地使用 PySpark 从 S3 读取 json 文件。这是代码

import os

import configparser
from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'

config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_key = config.get("default", "aws_access_key_id")
secret_key = config.get("default", "aws_secret_access_key")
session_token = config.get("default", "aws_session_token")

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("ReadGoogleTrendsData") \
        .master("local[1]") \
        .getOrCreate()

    sc=spark.sparkContext

    # hadoop_conf=sc._jsc.hadoopConfiguration()
    # hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    # hadoop_conf.set("fs.s3n.awsAccessKeyId", access_key)
    # hadoop_conf.set("fs.s3n.awsSecretAccessKey", secret_key)
    # hadoop_conf.set("fs.s3n.awsSessionToken", session_token)

    hadoop_conf=sc._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
    hadoop_conf.set("fs.s3a.access.key", access_key)
    hadoop_conf.set("fs.s3a.secret.key", secret_key)
    hadoop_conf.set("fs.s3a.session.token", session_token)
    hadoop_conf.set("fs.s3a.connection.ssl.enabled", "true")
    hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");

    df = spark.read.json("s3a://gamma-rp/trends-data/trends-2021-07-01.json")
    df.printSchema()
    df.show(5)

当我 运行 这个

时出现以下错误
File "/Users/rppatwa/Downloads/Spark/spark-2.4.5-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o37.json.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: NB1WXDBQJ5S848AM, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: HeI/luU5fMpN1dehMo9+yDzMAXin2j7AZPo3STRqbvx56rDcUotMdze08cJz08s7P581ATdRmck=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary.apply(DataSource.scala:557)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary.apply(DataSource.scala:545)
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:392)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)

我查看了这个错误,似乎我确实拥有存储桶的权限,因为我可以使用 AWS cli cp 命令将文件下载到我的本地计算机。 你能告诉我我在这里缺少什么吗?非常感谢!

看来问题出在您的配置上。我以前遇到过类似的问题,但我有以下作业成功地能够保存 abd 从 S3 存储桶中获取数据。

from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, TimestampType

spark = SparkSession \
    .builder \
    .appName("SparkExample") \
    .getOrCreate()

spark_context = spark.sparkContext
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", '<KEY>')
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 'SECRET_KEY')
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")

schema = StructType().add("_id", StringType()) \
    .add("employer", StringType()) \
    .add("created_at", TimestampType()) \
    .add("name", StringType())


employees = [{'_id': 1,
              'employer': 'Microsoft',
              'created_at': datetime.now(),
              'name': 'Noel'
              },
             {'_id': 2,
              'employer': 'Apple',
              'created_at': datetime.now(),
              'name': 'Steve'
              },
             ]
df = spark.createDataFrame(employees, schema=schema)

df.write \
    .format("json") \
    .mode("append") \
    .save("s3a://<YOUR BUCKET>/employeesjson")

collect = spark.read.format("json").load(
    "s3a://<YOUR BUCKET>/employeesjson").collect()
print(len(collect))

我能够通过将 spark 从使用 hadoop 2.7 更新到 hadoop 3.2 来解决这个问题