Groovy: 如何根据filePattern从HDFS获取某些文件
Groovy: How to get certain files from HDFS based on filePattern
我只想从 HDFS 目录中获取包含特定名称的那些文件(我已经放置了几个名称为 2017-090-0.1 的响应文件,
2017-090-0.2、2017-090-0.3 等。现在我想确保将一定数量的 flowFile 放入 hdfs(我的意思是,如果我发送 3 个具有相关名称的请求,我将不得不检查我是否已放入三个HDFS 中的响应)对于这种情况,我使用下面的代码,但它并不获取文件,有几个我感兴趣的子项目:
- 是否可以通过 nifi 功能在没有 groovy 代码的情况下完成此任务?
我应该更改什么才能使此代码正常工作?
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
def flowFile= session.get(1);// i gues it will return list of
flowfiles
def name="";
def count=0;
def value=0;
def amount=0;
List<FlowFile> flowFiles = new ArrayList<FlowFile>();
for(def n in flowFile){
name=n.getAttribute("realName")
count=n.getAttribute("count")
value=count as Number
value=Math.round(value)
}
session.remove(flowFile)
def findFileRecursive( String directoryName, String filePattern) {
def fileFound
def directory = new File(directoryName)
if (directory.isDirectory()){
def findFilenameClosure = { if (filePattern.matcher(it.name).find()){ fileFound = it } }
directory.eachFileRecurse(findFilenameClosure)
}
amount++;
flowFiles.add(fileFound);
return fileFound
}
String filePattern=filePattern.contains(name)
String directoryName="/group/test/userDate";
findFileRecursive(directoryName,filePattern);
if(amount==count){
for(def m in flowFiles){
session.transfer(m,REL_SUCCESS);
}
}
您可以使用 GetHDFS
处理器,它将使用可验证的工作代码从 HDFS 存储中检索这些文件,并将生成的流文件路由到 success
或 failure
关系。您无需编写任何自定义代码即可执行此任务。 PutHDFS
将在必要时执行回写到 HDFS。
我只想从 HDFS 目录中获取包含特定名称的那些文件(我已经放置了几个名称为 2017-090-0.1 的响应文件, 2017-090-0.2、2017-090-0.3 等。现在我想确保将一定数量的 flowFile 放入 hdfs(我的意思是,如果我发送 3 个具有相关名称的请求,我将不得不检查我是否已放入三个HDFS 中的响应)对于这种情况,我使用下面的代码,但它并不获取文件,有几个我感兴趣的子项目:
- 是否可以通过 nifi 功能在没有 groovy 代码的情况下完成此任务?
我应该更改什么才能使此代码正常工作?
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets def flowFile= session.get(1);// i gues it will return list of flowfiles def name=""; def count=0; def value=0; def amount=0; List<FlowFile> flowFiles = new ArrayList<FlowFile>(); for(def n in flowFile){ name=n.getAttribute("realName") count=n.getAttribute("count") value=count as Number value=Math.round(value) } session.remove(flowFile) def findFileRecursive( String directoryName, String filePattern) { def fileFound def directory = new File(directoryName) if (directory.isDirectory()){ def findFilenameClosure = { if (filePattern.matcher(it.name).find()){ fileFound = it } } directory.eachFileRecurse(findFilenameClosure) } amount++; flowFiles.add(fileFound); return fileFound } String filePattern=filePattern.contains(name) String directoryName="/group/test/userDate"; findFileRecursive(directoryName,filePattern); if(amount==count){ for(def m in flowFiles){ session.transfer(m,REL_SUCCESS); } }
您可以使用 GetHDFS
处理器,它将使用可验证的工作代码从 HDFS 存储中检索这些文件,并将生成的流文件路由到 success
或 failure
关系。您无需编写任何自定义代码即可执行此任务。 PutHDFS
将在必要时执行回写到 HDFS。