复制 S3 "folder" 最有效的方法是什么?

What is the most efficient way to copy a S3 "folder"?

我想找到一种有效的方法来将包含大量对象的 S3 folder/prefix 复制到同一存储桶中的另一个 folder/prefix。这是我试过的。

测试数据:大约 200 个对象,每个大约 100 MB 个。

1) aws s3 cp --recursive。它花了大约 150 secs

2) s3-dist-cp。花了大约 59 secs.

3) spark & aws jdk, 2 threads。花了大约 440 secs.

4) spark & aws jdk, 64 threads。花了大约 50 secs.

线程确实有效,但是当它进入单个线程时,aws java sdk 方法似乎不如 aws s3 cp 方法有效。 有没有单线程编程API,性能可以媲美aws s3 cp?或者有没有更好的复制数据?

理想情况下,我更愿意使用编程 API 以获得更大的灵活性。

以下是我使用的代码。

  import org.apache.hadoop.fs.{FileSystem, Path}
  import java.net.URI


  def listAllFiles(rootPath: String): Seq[String] = {
    val fileSystem = FileSystem.get(URI.create(rootPath), new Configuration())
    val it = fileSystem.listFiles(new Path(rootPath), true)
    var files = List[String]()

    while (it.hasNext) {
      files = it.next().getPath.toString::files
    }

    files
  }

  def s3CopyFiles(spark: SparkSession, fromPath: String, toPath: String): Unit = {
    val fromFiles = listAllFiles(fromPath)
    val toFiles = fromFiles.map(_.replaceFirst(fromPath, toPath))
    val fileMap = fromFiles.zip(toFiles)

    s3CopyFiles(spark, fileMap)
  }

  def s3CopyFiles(spark: SparkSession, fileMap: Seq[(String, String)]): Unit = {
    val sc = spark.sparkContext
    val filePairRdd = sc.parallelize(fileMap.toList, sc.defaultParallelism)
    filePairRdd.foreachPartition(it => {
      val p = "s3://([^/]*)/(.*)".r
      val s3 = AmazonS3ClientBuilder.defaultClient()
      while (it.hasNext) {
        val (p(fromBucket, fromKey), p(toBucket, toKey)) = it.next()
        s3.copyObject(fromBucket, fromKey, toBucket, toKey)
      }
    })
  }

我会推荐异步方法,例如 reactive-aws-clients。您仍然会受到 S3 节流带宽的限制,但您不需要在客户端使用大量线程。例如,您可以创建一个结构如下的 Monix 应用程序:

val future = listS3filesTask.flatMap(key => Task.now(getS3Object(key))).runAsync
Await.result(future, 100.seconds)

如果您有多个消费者,另一种可能的优化可能是使用 Torrent 协议 s3 feature,这样您就可以在消费者之间分发数据文件,每个文件只需一个 S3 GetObject 操作。

AWS SDK 传输管理器是多线程的;你告诉它你想要分割副本的块大小,它会跨线程完成并在最后合并输出。您的代码不必关心 thread/http 池的工作方式。

请记住,COPY 调用不执行 IO;每个线程发出 HTTP 请求,然后阻塞等待答案......你可以同时阻塞很多很多