NiFi 拆分 JSON 个数组

NiFi splitting JSON arrays

我正在尝试使用 NiFi 将 JSON 文件转换为 CSV,但我正在努力处理数组。我的 JSON 文件如下所示:

{
    "id": "24",
    "name": "",
    "height": [
        500.0,
        99999.0
    ],
    "average": [
        -2.0,
        -5.0
    ]
}

为了获得正确的 CSV 输出,我需要拆分数组,这样我的输出将如下所示(仍然是 JSON):

[
{
  "id" : "24",
  "name" : "",
  "height" : 500,
  "average" : -2
},
 {
  "id" : "24",
  "name" : "",
  "height" : 99999,
  "average" : -5
 }
]

我尝试了几种不同的方法,例如 JOLT 规范和在 $.height.* 处拆分我的 JSON,但似乎没有任何效果。我认为拆分 flowFile 可能是可行的方法,但如果我在 $.height.* 处拆分,我只会获取值并丢失其余的 flowfile 内容(我仍然需要拆分下一个数组)。有人可以帮助我吗?

JOLT 可能可以做到,但它在文档上有一些缺陷,所以这个 Groovy 代码应该在紧要关头为你工作 ExecuteScript:

import static groovy.json.JsonOutput.*

def flowFile = session.get()

if (flowFile) {
    def out = new ByteArrayOutputStream()
    def slurper = new groovy.json.JsonSlurper()
    session.exportTo(ff, out)
    out.close()

    def rawText = new String(out.toByteArray())
    def parsed = slurper.parseText(rawText)

    def results = []

    //This does assume that the length of the two arrays is always the same
    parsed.each { record ->
        1.upto(record["height"].size() -1) { index ->
            def newResult = [
                id: record["id"],
                name: record["name"],
                height: record["height"][index],
                average: record["average"][index]
            ]

            results << newResult
        }
    }

    flowFile = session.write({
        it.write(prettyPrint(toJson(results)).bytes)
    } as OutputStreamCallback)

    session.transfer(flowFile, REL_SUCCESS)
}

随时加入 nifi-users 邮件列表,寻求有关如何使用 JOLT 解决此问题的帮助。

我终于找到了解决问题的方法,使用带有以下 Groovy 脚本的 ExecuteScript 处理器:

import static groovy.json.JsonOutput.*
import org.apache.commons.io.IOUtils

def flowFile = session.get()

if (flowFile) {
    def out = new ByteArrayOutputStream()
    def slurper = new groovy.json.JsonSlurper()
    session.exportTo(flowFile, out)
    out.close()

    def rawText = new String(out.toByteArray())
    def parsed = slurper.parseText(rawText)
    
    if(parsed.mgrenze instanceof java.util.List) {

        def results = []

        parsed.height.eachWithIndex { item, index ->
            def newResult = [
              id : parsed.id,
              name : parsed.name,
              height : item,
              average : parsed.average[index]
                ]

                results << newResult
        }

        flowFile = session.write(flowFile, {outputStream ->
            outputStream.write(prettyPrint(toJson(results)).bytes)
        } as OutputStreamCallback)
    }

    session.transfer(flowFile, REL_SUCCESS)
}