通过 Apache-Spark 从 AWS S3 加载数据
Loading data from AWS S3 through Apache-Spark
我编写了 python 代码以通过 Apache-Spark 从 Amazon Web Service (AWS) S3 加载文件。具体来说,该代码使用 SparkContext().wholeTextFiles("s3n://ruofan-bucket/data")
在 AWS S3 上的存储桶 ruofan-bucket
中创建 RDD 并从目录 data
加载所有 csv 文件。代码如下:
import os, sys, inspect
### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]
### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark-1.4.0")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir
### Setup pyspark directory path
pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
sys.path.append(pyspark_dir)
### Import the pyspark
from pyspark import SparkConf, SparkContext
def main():
### Initialize the SparkConf and SparkContext
conf = SparkConf().setAppName("ruofan").setMaster("local")
sc = SparkContext(conf = conf)
### Create a RDD containing metadata about files in directory "data"
datafile = sc.wholeTextFiles("s3n://ruofan-bucket/data") ### Read data directory from S3 storage.
### Collect files from the RDD
datafile.collect()
if __name__ == "__main__":
main()
在我 运行 我的代码之前,我已经导出了环境变量:AWS_SECRET_ACCESS_KEY
和 AWS_ACCESS_KEY_ID
。但是当我 运行 我的代码时,它显示错误:
IOError: [Errno 2] No such file or directory: 's3n://ruofan-bucket/data/test1.csv'
我确定我在 AWS S3 上有目录和文件,但我不知道错误。如果有人帮我解决问题,我真的很感激。
wholeTextFiles
似乎不适用于 Amazon S3。
参见:
不过,Hadoop版本之间可能存在差异,所以不要一概而论。
您可以尝试以下方法从 S3 加载数据到 RDD,然后循环结果并打印出来。你可以在使用 Spark SQL 后进行任何转换。
val spark = SparkSession
.builder()
.appName("Spark SQL POC")
.master("local")
.getOrCreate()
import spark.implicits._
val sparkContext = spark.sparkContext
sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", accessKey)
sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", secret)
sparkContext.hadoopConfiguration.set("fs.s3.endpoint",region)
// The schema is encoded in a string
val schemaString = "country displayName alias status userName userID exportTime city title email registrationDate firstName lastName dateOfBirth address1 address2 address3 postCode telephoneNumber nickName exclusionExpiryDate exclusionPeriod blocked blockReason lastLoginDate gender mobileNumber marketingSms marketingEmail affiliateMarker depositLimitDate depositLimitPeriod depositLimitAmount depositLimitCurrency depositLimitWaitForUpdate depositLimitUpdatePeriod depositLimitUpdateAmount depositLimitUpdateCurrency userModified selfExclusionModified userBlockModified registeredBy userNote"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
var s3Users = spark.sqlContext.read.schema(schema).json("s3://asgaard-data/users/*/*/*/*/").rdd
// Apply the schema to the RDD
val usersDataFrame = spark.createDataFrame(s3Users, schema)
// Creates a temporary view using the DataFrame
usersDataFrame.createOrReplaceTempView("users")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT userName FROM users limit 10")
results.map(attributes => "UserName: " + attributes(0)).show()
版本如下
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.1.0</version>
</dependency>
我编写了 python 代码以通过 Apache-Spark 从 Amazon Web Service (AWS) S3 加载文件。具体来说,该代码使用 SparkContext().wholeTextFiles("s3n://ruofan-bucket/data")
在 AWS S3 上的存储桶 ruofan-bucket
中创建 RDD 并从目录 data
加载所有 csv 文件。代码如下:
import os, sys, inspect
### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]
### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark-1.4.0")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir
### Setup pyspark directory path
pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
sys.path.append(pyspark_dir)
### Import the pyspark
from pyspark import SparkConf, SparkContext
def main():
### Initialize the SparkConf and SparkContext
conf = SparkConf().setAppName("ruofan").setMaster("local")
sc = SparkContext(conf = conf)
### Create a RDD containing metadata about files in directory "data"
datafile = sc.wholeTextFiles("s3n://ruofan-bucket/data") ### Read data directory from S3 storage.
### Collect files from the RDD
datafile.collect()
if __name__ == "__main__":
main()
在我 运行 我的代码之前,我已经导出了环境变量:AWS_SECRET_ACCESS_KEY
和 AWS_ACCESS_KEY_ID
。但是当我 运行 我的代码时,它显示错误:
IOError: [Errno 2] No such file or directory: 's3n://ruofan-bucket/data/test1.csv'
我确定我在 AWS S3 上有目录和文件,但我不知道错误。如果有人帮我解决问题,我真的很感激。
wholeTextFiles
似乎不适用于 Amazon S3。
参见:
不过,Hadoop版本之间可能存在差异,所以不要一概而论。
您可以尝试以下方法从 S3 加载数据到 RDD,然后循环结果并打印出来。你可以在使用 Spark SQL 后进行任何转换。
val spark = SparkSession
.builder()
.appName("Spark SQL POC")
.master("local")
.getOrCreate()
import spark.implicits._
val sparkContext = spark.sparkContext
sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", accessKey)
sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", secret)
sparkContext.hadoopConfiguration.set("fs.s3.endpoint",region)
// The schema is encoded in a string
val schemaString = "country displayName alias status userName userID exportTime city title email registrationDate firstName lastName dateOfBirth address1 address2 address3 postCode telephoneNumber nickName exclusionExpiryDate exclusionPeriod blocked blockReason lastLoginDate gender mobileNumber marketingSms marketingEmail affiliateMarker depositLimitDate depositLimitPeriod depositLimitAmount depositLimitCurrency depositLimitWaitForUpdate depositLimitUpdatePeriod depositLimitUpdateAmount depositLimitUpdateCurrency userModified selfExclusionModified userBlockModified registeredBy userNote"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
var s3Users = spark.sqlContext.read.schema(schema).json("s3://asgaard-data/users/*/*/*/*/").rdd
// Apply the schema to the RDD
val usersDataFrame = spark.createDataFrame(s3Users, schema)
// Creates a temporary view using the DataFrame
usersDataFrame.createOrReplaceTempView("users")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT userName FROM users limit 10")
results.map(attributes => "UserName: " + attributes(0)).show()
版本如下
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.1.0</version>
</dependency>