通过 Spark 覆盖 HDFS file/directory

Overwriting HDFS file/directory through Spark

问题

我有一个文件保存在 HDFS 中,我想做的就是 运行 我的 spark 应用程序,计算结果 javaRDD 并使用 saveAsTextFile() 来存储新的"file" 在 HDFS 中。

但是,如果文件已经存在,Spark 的 saveAsTextFile() 将不起作用。它不会覆盖它。

我试过的

所以我搜索了一个解决方案,我发现一种可能的方法是在尝试保存新文件之前通过 HDFS API 删除文件。

我添加了代码:

FileSystem hdfs = FileSystem.get(new Configuration());
Path newFolderPath = new Path("hdfs://node1:50050/hdfs/" +filename);

if(hdfs.exists(newFolderPath)){
    System.out.println("EXISTS");
    hdfs.delete(newFolderPath, true);
}

filerdd.saveAsTextFile("/hdfs/" + filename);

当我尝试 运行 我的 Spark 应用程序时,文件已被删除,但我得到 FileNotFoundException.

考虑到当有人试图从某个路径读取文件但该文件不存在时会发生此异常,这没有意义,因为删除文件后,没有代码试图读取它.

我的部分代码

 JavaRDD<String> filerdd = sc.textFile("/hdfs/" + filename)    // load the file here
 ...
 ...
 // Transformations here
 filerdd = filerdd.map(....);
 ...
 ...

 // Delete old file here
 FileSystem hdfs = FileSystem.get(new Configuration());
 Path newFolderPath = new Path("hdfs://node1:50050/hdfs/" +filename);

 if(hdfs.exists(newFolderPath)){
    System.out.println("EXISTS");
    hdfs.delete(newFolderPath, true);
 }

 // Write new file here
 filerdd.saveAsTextFile("/hdfs/" + filename);

我想在这里做最简单的事情,但我不知道为什么这不起作用。也许 filerdd 以某种方式连接到路径??

问题是您对输入和输出使用相同的路径。 Spark 的 RDD 会延迟执行。它在您调用 saveAsTextFile 时运行。此时,您已经删除了newFolderPath。所以filerdd会抱怨。

无论如何,你不应该为输入和输出使用相同的路径。