fs.rename(new Path(raw FileName), in Path(process FileName)) 不工作

fs.rename(new Path(rawFileName), new Path(processFileName)) is not working

我正在研究基于 Scala 的 Apache Spark 实现,用于将数据从远程位置加载到 HDFS,然后将数据从 HDFS 摄取到 Hive 表。

使用我的第一个 spark 作业,我已经将 data/files 载入 HDFS 的某个位置,比如 -

hdfs://sandbox.hortonworks.com:8020/data/analytics/raw/ folder

让我们考虑一下,在加入 CT_Click_Basic.csv 和 CT_Click_Basic1.csv.gz 文件后,我在 HDFS 中有以下文件 [共享位置的文件名将是此处的文件夹名称,其内容将出现在 part-xxxxx 中文件]:

[root@sandbox ~]# hdfs dfs -ls /data/analytics/raw/*/ Found 3 items

-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS

-rw-r--r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00000

-rw-r--r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00001

Found 2 items

-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS

-rw-r--r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000

现在使用我的另一个 Spark 作业,我想将这些文件从 /raw 文件夹移动到 /process 然后最后根据每个阶段执行的任务到 HDFS 中的 /archive 文件夹。

为此,我首先使用以下代码获取 /raw 文件夹下所有文件的列表:

    def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
           if(!fileStatus.isDirectory()) {
                filePaths+=fileStatus.getPath()      
            }
            else {
                listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
            }
        }
  }   
  filePaths
}

然后使用以下代码行,我正在尝试将 /raw 文件夹中的文件 rename/move 到 /process 文件夹:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }

但我无法使用上面编写的代码 move/rename 这些文件。我尝试调试代码,发现 fs.rename() 返回“false”。

Please Note: I am able to achieve the file renaming/movement when I copy any file manually in /data/analytics/raw folder ex CT.csv [or any other file] and then running fs.rename() but it is not working for Part-xxxxx files.

有什么我遗漏的吗?

任何快速帮助将不胜感激。

此致, 布佩什

如果新路径 (rawFileName) 不存在,重命名可以 returns false。
在 fs.rename make 检查文件是否存在之前:

if (fs.exists(somePath)) {
 fs.rename...
}

另一个原因可能是您尝试重命名的文件已被他人使用。或者,如果您尝试重命名目录,其中的某些文件可能会被其他人使用。
为确保这是根本原因,请尝试重命名代码中的其他文件:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path("**/TESTDIR1**"), new Path("**/TESTDIR2**"))
 }

如果此重命名成功,根本原因确实是竞争条件。

终于找到问题了。实际上我试图将文件从 /data/analytics/raw/folder.csv/part-xxxxx 重命名为 /data/analytics/process/folder.csv/part-xxxxx 其中 /data/analytics/process 存在于 HDFS 但 "folder.csv" 不存在;因此它在重命名时返回 false。我在我的代码中添加了以下行并且对我来说工作正常

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size

   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)

   var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
   var processFolderPath = new Path(processFolderName)
   if(!(fs.exists(processFolderPath)))
         fs.mkdirs(processFolderPath)
   val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }