NiFi 拆分 JSON 个数组
NiFi splitting JSON arrays
我正在尝试使用 NiFi 将 JSON 文件转换为 CSV,但我正在努力处理数组。我的 JSON 文件如下所示:
{
"id": "24",
"name": "",
"height": [
500.0,
99999.0
],
"average": [
-2.0,
-5.0
]
}
为了获得正确的 CSV 输出,我需要拆分数组,这样我的输出将如下所示(仍然是 JSON):
[
{
"id" : "24",
"name" : "",
"height" : 500,
"average" : -2
},
{
"id" : "24",
"name" : "",
"height" : 99999,
"average" : -5
}
]
我尝试了几种不同的方法,例如 JOLT 规范和在 $.height.* 处拆分我的 JSON,但似乎没有任何效果。我认为拆分 flowFile 可能是可行的方法,但如果我在 $.height.* 处拆分,我只会获取值并丢失其余的 flowfile 内容(我仍然需要拆分下一个数组)。有人可以帮助我吗?
JOLT 可能可以做到,但它在文档上有一些缺陷,所以这个 Groovy 代码应该在紧要关头为你工作 ExecuteScript
:
import static groovy.json.JsonOutput.*
def flowFile = session.get()
if (flowFile) {
def out = new ByteArrayOutputStream()
def slurper = new groovy.json.JsonSlurper()
session.exportTo(ff, out)
out.close()
def rawText = new String(out.toByteArray())
def parsed = slurper.parseText(rawText)
def results = []
//This does assume that the length of the two arrays is always the same
parsed.each { record ->
1.upto(record["height"].size() -1) { index ->
def newResult = [
id: record["id"],
name: record["name"],
height: record["height"][index],
average: record["average"][index]
]
results << newResult
}
}
flowFile = session.write({
it.write(prettyPrint(toJson(results)).bytes)
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}
随时加入 nifi-users 邮件列表,寻求有关如何使用 JOLT 解决此问题的帮助。
我终于找到了解决问题的方法,使用带有以下 Groovy 脚本的 ExecuteScript 处理器:
import static groovy.json.JsonOutput.*
import org.apache.commons.io.IOUtils
def flowFile = session.get()
if (flowFile) {
def out = new ByteArrayOutputStream()
def slurper = new groovy.json.JsonSlurper()
session.exportTo(flowFile, out)
out.close()
def rawText = new String(out.toByteArray())
def parsed = slurper.parseText(rawText)
if(parsed.mgrenze instanceof java.util.List) {
def results = []
parsed.height.eachWithIndex { item, index ->
def newResult = [
id : parsed.id,
name : parsed.name,
height : item,
average : parsed.average[index]
]
results << newResult
}
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(prettyPrint(toJson(results)).bytes)
} as OutputStreamCallback)
}
session.transfer(flowFile, REL_SUCCESS)
}
我正在尝试使用 NiFi 将 JSON 文件转换为 CSV,但我正在努力处理数组。我的 JSON 文件如下所示:
{
"id": "24",
"name": "",
"height": [
500.0,
99999.0
],
"average": [
-2.0,
-5.0
]
}
为了获得正确的 CSV 输出,我需要拆分数组,这样我的输出将如下所示(仍然是 JSON):
[
{
"id" : "24",
"name" : "",
"height" : 500,
"average" : -2
},
{
"id" : "24",
"name" : "",
"height" : 99999,
"average" : -5
}
]
我尝试了几种不同的方法,例如 JOLT 规范和在 $.height.* 处拆分我的 JSON,但似乎没有任何效果。我认为拆分 flowFile 可能是可行的方法,但如果我在 $.height.* 处拆分,我只会获取值并丢失其余的 flowfile 内容(我仍然需要拆分下一个数组)。有人可以帮助我吗?
JOLT 可能可以做到,但它在文档上有一些缺陷,所以这个 Groovy 代码应该在紧要关头为你工作 ExecuteScript
:
import static groovy.json.JsonOutput.*
def flowFile = session.get()
if (flowFile) {
def out = new ByteArrayOutputStream()
def slurper = new groovy.json.JsonSlurper()
session.exportTo(ff, out)
out.close()
def rawText = new String(out.toByteArray())
def parsed = slurper.parseText(rawText)
def results = []
//This does assume that the length of the two arrays is always the same
parsed.each { record ->
1.upto(record["height"].size() -1) { index ->
def newResult = [
id: record["id"],
name: record["name"],
height: record["height"][index],
average: record["average"][index]
]
results << newResult
}
}
flowFile = session.write({
it.write(prettyPrint(toJson(results)).bytes)
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}
随时加入 nifi-users 邮件列表,寻求有关如何使用 JOLT 解决此问题的帮助。
我终于找到了解决问题的方法,使用带有以下 Groovy 脚本的 ExecuteScript 处理器:
import static groovy.json.JsonOutput.*
import org.apache.commons.io.IOUtils
def flowFile = session.get()
if (flowFile) {
def out = new ByteArrayOutputStream()
def slurper = new groovy.json.JsonSlurper()
session.exportTo(flowFile, out)
out.close()
def rawText = new String(out.toByteArray())
def parsed = slurper.parseText(rawText)
if(parsed.mgrenze instanceof java.util.List) {
def results = []
parsed.height.eachWithIndex { item, index ->
def newResult = [
id : parsed.id,
name : parsed.name,
height : item,
average : parsed.average[index]
]
results << newResult
}
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(prettyPrint(toJson(results)).bytes)
} as OutputStreamCallback)
}
session.transfer(flowFile, REL_SUCCESS)
}