复制 S3 "folder" 最有效的方法是什么?
What is the most efficient way to copy a S3 "folder"?
我想找到一种有效的方法来将包含大量对象的 S3 folder/prefix 复制到同一存储桶中的另一个 folder/prefix。这是我试过的。
测试数据:大约 200
个对象,每个大约 100 MB
个。
1) aws s3 cp --recursive
。它花了大约 150 secs
。
2) s3-dist-cp
。花了大约 59 secs
.
3) spark & aws jdk, 2 threads
。花了大约 440 secs
.
4) spark & aws jdk, 64 threads
。花了大约 50 secs
.
线程确实有效,但是当它进入单个线程时,aws java sdk
方法似乎不如 aws s3 cp
方法有效。 有没有单线程编程API,性能可以媲美aws s3 cp
?或者有没有更好的复制数据?
理想情况下,我更愿意使用编程 API 以获得更大的灵活性。
以下是我使用的代码。
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
def listAllFiles(rootPath: String): Seq[String] = {
val fileSystem = FileSystem.get(URI.create(rootPath), new Configuration())
val it = fileSystem.listFiles(new Path(rootPath), true)
var files = List[String]()
while (it.hasNext) {
files = it.next().getPath.toString::files
}
files
}
def s3CopyFiles(spark: SparkSession, fromPath: String, toPath: String): Unit = {
val fromFiles = listAllFiles(fromPath)
val toFiles = fromFiles.map(_.replaceFirst(fromPath, toPath))
val fileMap = fromFiles.zip(toFiles)
s3CopyFiles(spark, fileMap)
}
def s3CopyFiles(spark: SparkSession, fileMap: Seq[(String, String)]): Unit = {
val sc = spark.sparkContext
val filePairRdd = sc.parallelize(fileMap.toList, sc.defaultParallelism)
filePairRdd.foreachPartition(it => {
val p = "s3://([^/]*)/(.*)".r
val s3 = AmazonS3ClientBuilder.defaultClient()
while (it.hasNext) {
val (p(fromBucket, fromKey), p(toBucket, toKey)) = it.next()
s3.copyObject(fromBucket, fromKey, toBucket, toKey)
}
})
}
我会推荐异步方法,例如 reactive-aws-clients。您仍然会受到 S3 节流带宽的限制,但您不需要在客户端使用大量线程。例如,您可以创建一个结构如下的 Monix 应用程序:
val future = listS3filesTask.flatMap(key => Task.now(getS3Object(key))).runAsync
Await.result(future, 100.seconds)
如果您有多个消费者,另一种可能的优化可能是使用 Torrent 协议 s3 feature,这样您就可以在消费者之间分发数据文件,每个文件只需一个 S3 GetObject
操作。
AWS SDK 传输管理器是多线程的;你告诉它你想要分割副本的块大小,它会跨线程完成并在最后合并输出。您的代码不必关心 thread/http 池的工作方式。
请记住,COPY 调用不执行 IO;每个线程发出 HTTP 请求,然后阻塞等待答案......你可以同时阻塞很多很多
我想找到一种有效的方法来将包含大量对象的 S3 folder/prefix 复制到同一存储桶中的另一个 folder/prefix。这是我试过的。
测试数据:大约 200
个对象,每个大约 100 MB
个。
1) aws s3 cp --recursive
。它花了大约 150 secs
。
2) s3-dist-cp
。花了大约 59 secs
.
3) spark & aws jdk, 2 threads
。花了大约 440 secs
.
4) spark & aws jdk, 64 threads
。花了大约 50 secs
.
线程确实有效,但是当它进入单个线程时,aws java sdk
方法似乎不如 aws s3 cp
方法有效。 有没有单线程编程API,性能可以媲美aws s3 cp
?或者有没有更好的复制数据?
理想情况下,我更愿意使用编程 API 以获得更大的灵活性。
以下是我使用的代码。
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
def listAllFiles(rootPath: String): Seq[String] = {
val fileSystem = FileSystem.get(URI.create(rootPath), new Configuration())
val it = fileSystem.listFiles(new Path(rootPath), true)
var files = List[String]()
while (it.hasNext) {
files = it.next().getPath.toString::files
}
files
}
def s3CopyFiles(spark: SparkSession, fromPath: String, toPath: String): Unit = {
val fromFiles = listAllFiles(fromPath)
val toFiles = fromFiles.map(_.replaceFirst(fromPath, toPath))
val fileMap = fromFiles.zip(toFiles)
s3CopyFiles(spark, fileMap)
}
def s3CopyFiles(spark: SparkSession, fileMap: Seq[(String, String)]): Unit = {
val sc = spark.sparkContext
val filePairRdd = sc.parallelize(fileMap.toList, sc.defaultParallelism)
filePairRdd.foreachPartition(it => {
val p = "s3://([^/]*)/(.*)".r
val s3 = AmazonS3ClientBuilder.defaultClient()
while (it.hasNext) {
val (p(fromBucket, fromKey), p(toBucket, toKey)) = it.next()
s3.copyObject(fromBucket, fromKey, toBucket, toKey)
}
})
}
我会推荐异步方法,例如 reactive-aws-clients。您仍然会受到 S3 节流带宽的限制,但您不需要在客户端使用大量线程。例如,您可以创建一个结构如下的 Monix 应用程序:
val future = listS3filesTask.flatMap(key => Task.now(getS3Object(key))).runAsync
Await.result(future, 100.seconds)
如果您有多个消费者,另一种可能的优化可能是使用 Torrent 协议 s3 feature,这样您就可以在消费者之间分发数据文件,每个文件只需一个 S3 GetObject
操作。
AWS SDK 传输管理器是多线程的;你告诉它你想要分割副本的块大小,它会跨线程完成并在最后合并输出。您的代码不必关心 thread/http 池的工作方式。
请记住,COPY 调用不执行 IO;每个线程发出 HTTP 请求,然后阻塞等待答案......你可以同时阻塞很多很多