通过添加新行,根据 Nifi 中的列值修改 csv

Modify csv based on a column value in Nifi by adding a new line

我有表格的csv文件

Id, Name, Class
1, Kevin,[Eight, Nine]
2, Mark,Four

如何创建一个新的 csv,如下所示

Id, Name, Class
1, Kevin,Eight
1, Kevin,Nine
2, Mark,Four

基本上,如果列 Class 具有字符串数组,则将其放在多行中以复制所有其他列值。 我可以使用 replaceText 删除 [ ] 括号并替换为空字符串。所以,基本上我有以下 csv,如果有帮助的话。

Id, Name, Class
    1, Kevin, Eight, Nine
    2, Mark,Four
  • CSV 文件的格式不正确。因此,我们需要通过用双引号括起数组元素来更正它(因为该字段中有逗号)。在这里,我们可以使用 ReplaceText 处理器将“[”替换为“[”,将“]”替换为“]”。我在这里使用了两个 ReplaceText 处理器。

现在的输出是这样的:

Id,Name,Class
1,Kevin,"[Eight,Nine]"
2,Mark,Four
  • 接下来我们将CSV数据转换成JSON。我这里使用了ConvertRecord处理器。

输出:

[ {
    "Id" : 1,
    "Name" : "Kevin",
    "Class" : "[Eight,Nine]"
}, {
    "Id" : 2,
    "Name" : "Mark",
    "Class" : "Four"
} ]
  • 我们可以将此 JSON 传递给 ExecuteScript 处理器以拆分“Class”数组。这是我使用的 ECMAScript:

代码:

var flowFile = session.get();
if (flowFile != null) {

    var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback")
    var IOUtils = Java.type("org.apache.commons.io.IOUtils")
    var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

    flowFile = session.write(flowFile,
        new StreamCallback(function(inputStream, outputStream) {
            var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
            var json = JSON.parse(text);
            
            // Output array
            var newArray = [];

            for (index in json) {
                obj = json[index];
                
                // if the Class has '[' at the beginning
                if (obj['Class'].indexOf('[') == 0) {
                    var rightBracket = obj['Class'].indexOf(']');

                    // Get the value of Class without brackets
                    var classValue = obj['Class'].substring(1, rightBracket);

                    // We split the value with comma
                    var values = classValue.split(',');

                    // We push each value of the class in the output array
                    for (var i in values) {
                        newArray.push({
                            "Id": obj['Id'],
                            "Name": obj['Name'],
                            "Class": values[i]
                        });
                    }
                } else {
                    // Normal entry, Class is not an array
                    newArray.push(obj);
                }
            }

            outputStream.write(JSON.stringify(newArray, null, '\t').getBytes(StandardCharsets.UTF_8));
        }
    ));
    session.transfer(flowFile, REL_SUCCESS);
}

输出:

[
    {
        "Id": 1,
        "Name": "Kevin",
        "Class": "Eight"
    },
    {
        "Id": 1,
        "Name": "Kevin",
        "Class": "Nine"
    },
    {
        "Id": 2,
        "Name": "Mark",
        "Class": "Four"
    }
]
  • 现在我们可以使用另一个 ConvertRecord 处理器将此 JSON 转换为 CSV 文件。

输出:

Id,Name,Class
1,Kevin,Eight
1,Kevin,Nine
2,Mark,Four

我使用 ExecuteGroovyScript 将 [Eight, Nine] 中的字符串拆分为 ,。拆分此字符串后,我会在内容前后附加剩余内容。然后我使用 ReplaceText 删除 [ 和 ].

def flowFile = session.get()
if(!flowFile) return
try {
flowFile = session.write(flowFile, {inputStream, outputStream ->
    outputStream.withWriter("UTF-8"){ w ->
        inputStream.eachLine("UTF-8"){ line ->
         def splitArray = new String[0];
         def subString = "";
         def x = line.indexOf("[")+1;
         def y = line.indexOf("]");
         if(x > 0 && y >0)
         subString = line.substring(x,y);
         if(subString != null && subString.length() >0)
             splitArray = subString.split(',')
             if(splitArray.length > 1) {
                 def lineBefore = line.substring(0,x);
                 def lineAfter = line.substring(y,line.length());
                for(int i=0;i<splitArray.length;i++) {
                    w << lineBefore << splitArray.getAt(i) << lineAfter << '\n'
                }
            }else {
                w << line << '\n'
            }
        }
    }
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)
}catch(e) {
      log.error('Error capturing nextLink', e)
      session.transfer(flowFile, REL_FAILURE)
}