通过添加新行,根据 Nifi 中的列值修改 csv
Modify csv based on a column value in Nifi by adding a new line
我有表格的csv文件
Id, Name, Class
1, Kevin,[Eight, Nine]
2, Mark,Four
如何创建一个新的 csv,如下所示
Id, Name, Class
1, Kevin,Eight
1, Kevin,Nine
2, Mark,Four
基本上,如果列 Class 具有字符串数组,则将其放在多行中以复制所有其他列值。
我可以使用 replaceText 删除 [ ] 括号并替换为空字符串。所以,基本上我有以下 csv,如果有帮助的话。
Id, Name, Class
1, Kevin, Eight, Nine
2, Mark,Four
- CSV 文件的格式不正确。因此,我们需要通过用双引号括起数组元素来更正它(因为该字段中有逗号)。在这里,我们可以使用 ReplaceText 处理器将“[”替换为“[”,将“]”替换为“]”。我在这里使用了两个 ReplaceText 处理器。
现在的输出是这样的:
Id,Name,Class
1,Kevin,"[Eight,Nine]"
2,Mark,Four
- 接下来我们将CSV数据转换成JSON。我这里使用了ConvertRecord处理器。
输出:
[ {
"Id" : 1,
"Name" : "Kevin",
"Class" : "[Eight,Nine]"
}, {
"Id" : 2,
"Name" : "Mark",
"Class" : "Four"
} ]
- 我们可以将此 JSON 传递给 ExecuteScript 处理器以拆分“Class”数组。这是我使用的 ECMAScript:
代码:
var flowFile = session.get();
if (flowFile != null) {
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
flowFile = session.write(flowFile,
new StreamCallback(function(inputStream, outputStream) {
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
var json = JSON.parse(text);
// Output array
var newArray = [];
for (index in json) {
obj = json[index];
// if the Class has '[' at the beginning
if (obj['Class'].indexOf('[') == 0) {
var rightBracket = obj['Class'].indexOf(']');
// Get the value of Class without brackets
var classValue = obj['Class'].substring(1, rightBracket);
// We split the value with comma
var values = classValue.split(',');
// We push each value of the class in the output array
for (var i in values) {
newArray.push({
"Id": obj['Id'],
"Name": obj['Name'],
"Class": values[i]
});
}
} else {
// Normal entry, Class is not an array
newArray.push(obj);
}
}
outputStream.write(JSON.stringify(newArray, null, '\t').getBytes(StandardCharsets.UTF_8));
}
));
session.transfer(flowFile, REL_SUCCESS);
}
输出:
[
{
"Id": 1,
"Name": "Kevin",
"Class": "Eight"
},
{
"Id": 1,
"Name": "Kevin",
"Class": "Nine"
},
{
"Id": 2,
"Name": "Mark",
"Class": "Four"
}
]
- 现在我们可以使用另一个 ConvertRecord 处理器将此 JSON 转换为 CSV 文件。
输出:
Id,Name,Class
1,Kevin,Eight
1,Kevin,Nine
2,Mark,Four
我使用 ExecuteGroovyScript 将 [Eight, Nine] 中的字符串拆分为 ,。拆分此字符串后,我会在内容前后附加剩余内容。然后我使用 ReplaceText 删除 [ 和 ].
def flowFile = session.get()
if(!flowFile) return
try {
flowFile = session.write(flowFile, {inputStream, outputStream ->
outputStream.withWriter("UTF-8"){ w ->
inputStream.eachLine("UTF-8"){ line ->
def splitArray = new String[0];
def subString = "";
def x = line.indexOf("[")+1;
def y = line.indexOf("]");
if(x > 0 && y >0)
subString = line.substring(x,y);
if(subString != null && subString.length() >0)
splitArray = subString.split(',')
if(splitArray.length > 1) {
def lineBefore = line.substring(0,x);
def lineAfter = line.substring(y,line.length());
for(int i=0;i<splitArray.length;i++) {
w << lineBefore << splitArray.getAt(i) << lineAfter << '\n'
}
}else {
w << line << '\n'
}
}
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}catch(e) {
log.error('Error capturing nextLink', e)
session.transfer(flowFile, REL_FAILURE)
}
我有表格的csv文件
Id, Name, Class
1, Kevin,[Eight, Nine]
2, Mark,Four
如何创建一个新的 csv,如下所示
Id, Name, Class
1, Kevin,Eight
1, Kevin,Nine
2, Mark,Four
基本上,如果列 Class 具有字符串数组,则将其放在多行中以复制所有其他列值。 我可以使用 replaceText 删除 [ ] 括号并替换为空字符串。所以,基本上我有以下 csv,如果有帮助的话。
Id, Name, Class
1, Kevin, Eight, Nine
2, Mark,Four
- CSV 文件的格式不正确。因此,我们需要通过用双引号括起数组元素来更正它(因为该字段中有逗号)。在这里,我们可以使用 ReplaceText 处理器将“[”替换为“[”,将“]”替换为“]”。我在这里使用了两个 ReplaceText 处理器。
现在的输出是这样的:
Id,Name,Class
1,Kevin,"[Eight,Nine]"
2,Mark,Four
- 接下来我们将CSV数据转换成JSON。我这里使用了ConvertRecord处理器。
输出:
[ {
"Id" : 1,
"Name" : "Kevin",
"Class" : "[Eight,Nine]"
}, {
"Id" : 2,
"Name" : "Mark",
"Class" : "Four"
} ]
- 我们可以将此 JSON 传递给 ExecuteScript 处理器以拆分“Class”数组。这是我使用的 ECMAScript:
代码:
var flowFile = session.get();
if (flowFile != null) {
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
flowFile = session.write(flowFile,
new StreamCallback(function(inputStream, outputStream) {
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
var json = JSON.parse(text);
// Output array
var newArray = [];
for (index in json) {
obj = json[index];
// if the Class has '[' at the beginning
if (obj['Class'].indexOf('[') == 0) {
var rightBracket = obj['Class'].indexOf(']');
// Get the value of Class without brackets
var classValue = obj['Class'].substring(1, rightBracket);
// We split the value with comma
var values = classValue.split(',');
// We push each value of the class in the output array
for (var i in values) {
newArray.push({
"Id": obj['Id'],
"Name": obj['Name'],
"Class": values[i]
});
}
} else {
// Normal entry, Class is not an array
newArray.push(obj);
}
}
outputStream.write(JSON.stringify(newArray, null, '\t').getBytes(StandardCharsets.UTF_8));
}
));
session.transfer(flowFile, REL_SUCCESS);
}
输出:
[
{
"Id": 1,
"Name": "Kevin",
"Class": "Eight"
},
{
"Id": 1,
"Name": "Kevin",
"Class": "Nine"
},
{
"Id": 2,
"Name": "Mark",
"Class": "Four"
}
]
- 现在我们可以使用另一个 ConvertRecord 处理器将此 JSON 转换为 CSV 文件。
输出:
Id,Name,Class
1,Kevin,Eight
1,Kevin,Nine
2,Mark,Four
我使用 ExecuteGroovyScript 将 [Eight, Nine] 中的字符串拆分为 ,。拆分此字符串后,我会在内容前后附加剩余内容。然后我使用 ReplaceText 删除 [ 和 ].
def flowFile = session.get()
if(!flowFile) return
try {
flowFile = session.write(flowFile, {inputStream, outputStream ->
outputStream.withWriter("UTF-8"){ w ->
inputStream.eachLine("UTF-8"){ line ->
def splitArray = new String[0];
def subString = "";
def x = line.indexOf("[")+1;
def y = line.indexOf("]");
if(x > 0 && y >0)
subString = line.substring(x,y);
if(subString != null && subString.length() >0)
splitArray = subString.split(',')
if(splitArray.length > 1) {
def lineBefore = line.substring(0,x);
def lineAfter = line.substring(y,line.length());
for(int i=0;i<splitArray.length;i++) {
w << lineBefore << splitArray.getAt(i) << lineAfter << '\n'
}
}else {
w << line << '\n'
}
}
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}catch(e) {
log.error('Error capturing nextLink', e)
session.transfer(flowFile, REL_FAILURE)
}