在 Google Dataproc 上使用 FileUtil.copyMerge?
Use FileUtil.copyMerge on Google Dataproc?
我想将 Spark 作业生成的多个文件合并到一个文件中。通常我会做这样的事情:
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
val deleteSrcFiles = true
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), deleteSrcFiles, hadoopConfig, null)
这在本地运行良好,使用路径例如 /tmp/some/path/to.csv,但在集群 my-cluster 上执行时会导致异常:
Wrong FS: gs://myBucket/path/to/result.csv, expected: hdfs://my-cluster-m
是否可以从 Dataproc 集群上的 scala/java 代码 运行 获取 gs:// 路径的文件系统?
编辑
找到google-存储客户端库:
https://cloud.google.com/storage/docs/reference/libraries#client-libraries-install-java
您只能将属于特定文件系统的路径与该文件系统一起使用,例如您不能像上面那样将 gs:// 路径传递给 HDFS。
以下代码片段对我有用:
val hadoopConfig = new Configuration()
val srcPath = new Path("hdfs:/tmp/foo")
val hdfs = srcPath.getFileSystem(hadoopConfig)
val dstPath = new Path("gs://bucket/foo")
val gcs = dstPath.getFileSystem(hadoopConfig)
val deleteSrcFiles = true
FileUtil.copyMerge(hdfs, srcPath, gcs, dstPath, deleteSrcFiles, hadoopConfig, null)
我想将 Spark 作业生成的多个文件合并到一个文件中。通常我会做这样的事情:
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
val deleteSrcFiles = true
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), deleteSrcFiles, hadoopConfig, null)
这在本地运行良好,使用路径例如 /tmp/some/path/to.csv,但在集群 my-cluster 上执行时会导致异常:
Wrong FS: gs://myBucket/path/to/result.csv, expected: hdfs://my-cluster-m
是否可以从 Dataproc 集群上的 scala/java 代码 运行 获取 gs:// 路径的文件系统?
编辑
找到google-存储客户端库: https://cloud.google.com/storage/docs/reference/libraries#client-libraries-install-java
您只能将属于特定文件系统的路径与该文件系统一起使用,例如您不能像上面那样将 gs:// 路径传递给 HDFS。
以下代码片段对我有用:
val hadoopConfig = new Configuration()
val srcPath = new Path("hdfs:/tmp/foo")
val hdfs = srcPath.getFileSystem(hadoopConfig)
val dstPath = new Path("gs://bucket/foo")
val gcs = dstPath.getFileSystem(hadoopConfig)
val deleteSrcFiles = true
FileUtil.copyMerge(hdfs, srcPath, gcs, dstPath, deleteSrcFiles, hadoopConfig, null)