Spark Streaming 检查点到 amazon s3
Spark Streaming checkpoint to amazon s3
我正在尝试将 rdd 检查点设置到非 hdfs 系统。从 DSE document 来看,似乎无法使用 cassandra 文件系统。所以我打算使用 amazon s3 。但是我找不到任何使用 AWS 的好例子。
问题
- 如何使用 Amazon S3 作为检查点目录?调用
ssc.checkpoint(amazons3url) ?
- 除了hadoop文件系统之外,检查点是否可以有任何其他可靠的数据存储?
要Checkpoint到S3,可以将下面的notation传给StreamingContext def checkpoint(directory: String): Unit
方法
s3n://<aws-access-key>:<aws-secret-key>@<s3-bucket>/<prefix ...>
Spark Documentation for checkpointing, is Taychyon
中未列出的另一个可靠的文件系统
来自 link
中的答案
解决方案 1:
export AWS_ACCESS_KEY_ID=<your access>
export AWS_SECRET_ACCESS_KEY=<your secret>
ssc.checkpoint(checkpointDirectory)
设置检查点目录为S3 URL -
s3n://spark-streaming/checkpoint
然后使用 spark 提交启动您的 spark 应用程序。
这适用于 spark 1.4.2
解决方案 2:
val hadoopConf: Configuration = new Configuration()
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", "id-1")
hadoopConf.set("fs.s3n.awsSecretAccessKey", "secret-key")
StreamingContext.getOrCreate(checkPointDir, () => {
createStreamingContext(checkPointDir, config)
}, hadoopConf)
我正在尝试将 rdd 检查点设置到非 hdfs 系统。从 DSE document 来看,似乎无法使用 cassandra 文件系统。所以我打算使用 amazon s3 。但是我找不到任何使用 AWS 的好例子。
问题
- 如何使用 Amazon S3 作为检查点目录?调用 ssc.checkpoint(amazons3url) ?
- 除了hadoop文件系统之外,检查点是否可以有任何其他可靠的数据存储?
要Checkpoint到S3,可以将下面的notation传给StreamingContext def checkpoint(directory: String): Unit
方法
s3n://<aws-access-key>:<aws-secret-key>@<s3-bucket>/<prefix ...>
Spark Documentation for checkpointing, is Taychyon
中未列出的另一个可靠的文件系统来自 link
中的答案解决方案 1:
export AWS_ACCESS_KEY_ID=<your access>
export AWS_SECRET_ACCESS_KEY=<your secret>
ssc.checkpoint(checkpointDirectory)
设置检查点目录为S3 URL -
s3n://spark-streaming/checkpoint
然后使用 spark 提交启动您的 spark 应用程序。
这适用于 spark 1.4.2
解决方案 2:
val hadoopConf: Configuration = new Configuration()
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", "id-1")
hadoopConf.set("fs.s3n.awsSecretAccessKey", "secret-key")
StreamingContext.getOrCreate(checkPointDir, () => {
createStreamingContext(checkPointDir, config)
}, hadoopConf)