S3 目录上的 Spark Streaming
Spark Streaming on a S3 Directory
所以我有数千个事件通过 Amazon Kinesis 流式传输到 SQS,然后转储到 S3 目录。大约每 10 分钟,就会创建一个新的文本文件,以将数据从 Kinesis 转储到 S3 中。我想设置 Spark Streaming,以便它可以将转储到 S3 中的新文件流式传输。现在我有
import org.apache.spark.streaming._
val currentFileStream = ssc.textFileStream("s3://bucket/directory/event_name=accepted/")
currentFileStream.print
ssc.start()
但是,Spark Streaming 没有提取转储到 S3 中的新文件。我认为这与文件写入要求有关:
The files must have the same data format.
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
为什么 Spark streaming 没有提取新文件?是因为 AWS 正在目录中创建文件而不是移动它们吗?我如何才能确保 Spark 提取转储到 S3 中的文件?
为了流式传输 S3 存储桶。您需要提供 S3 存储桶的路径。它将流式传输此存储桶中所有文件的所有数据。然后,每当在此存储桶中创建 w 个新文件时,它将被流式传输。如果您将数据附加到之前读取的现有文件,将不会读取这些新更新。
这是一小段有效的代码
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)
//ones above this may be deprecated?
hadoopConf.set("fs.s3n.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey",mySecretKey)
val ssc = new org.apache.spark.streaming.StreamingContext(
sc,Seconds(60))
val lines = ssc.textFileStream("s3n://path to bucket")
lines.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
希望对您有所帮助。
所以我有数千个事件通过 Amazon Kinesis 流式传输到 SQS,然后转储到 S3 目录。大约每 10 分钟,就会创建一个新的文本文件,以将数据从 Kinesis 转储到 S3 中。我想设置 Spark Streaming,以便它可以将转储到 S3 中的新文件流式传输。现在我有
import org.apache.spark.streaming._
val currentFileStream = ssc.textFileStream("s3://bucket/directory/event_name=accepted/")
currentFileStream.print
ssc.start()
但是,Spark Streaming 没有提取转储到 S3 中的新文件。我认为这与文件写入要求有关:
The files must have the same data format.
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
为什么 Spark streaming 没有提取新文件?是因为 AWS 正在目录中创建文件而不是移动它们吗?我如何才能确保 Spark 提取转储到 S3 中的文件?
为了流式传输 S3 存储桶。您需要提供 S3 存储桶的路径。它将流式传输此存储桶中所有文件的所有数据。然后,每当在此存储桶中创建 w 个新文件时,它将被流式传输。如果您将数据附加到之前读取的现有文件,将不会读取这些新更新。
这是一小段有效的代码
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)
//ones above this may be deprecated?
hadoopConf.set("fs.s3n.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey",mySecretKey)
val ssc = new org.apache.spark.streaming.StreamingContext(
sc,Seconds(60))
val lines = ssc.textFileStream("s3n://path to bucket")
lines.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
希望对您有所帮助。