NiFi 中的 Jolt 变换生成阵列

Jolt Transform Producing Arrays in NiFi

我以前从未使用过 Jolt Transform,我不确定如何修复我所做的,所以如果这实际上是一个非常简单的修复,我深表歉意。

我有两个 XML 文件(下面是虚拟版本,实际包含 PII),我在 NiFi 中使用 MergeRecord 将它们合并在一起。由于输出的方式(一个带有 JSONS 数组的流文件),建议我使用 JoltTransform 将它们正确合并在一起。有人向我指出 this 关于如何在 NiFi 中进行流媒体连接的问题(这正是我所需要的)。

虽然这在大多数情况下都有效,但我仍然遇到一个问题。我的 "base" 级别的所有标签(FatherID、FatherName、BirthDate 等)都变成了数组。我需要这些不是数组,因为我想使用我在 MergeRecord 中使用的相同组合模式(它没有那些字段作为数组)。

我是否需要在规范中更改某些内容,或者我是否需要执行另一个 JoltTransform(这没问题)?

输入XML1

<?xml version="1.0" encoding="UTF-8"?>
<FoundingFathers>
   <FatherID>1234</FatherID>
   <FatherName>George Washington</FatherName>
   <ResidentialInformation>
      <Name>Mount Vernon</Name>
      <StreetAddress>3200 Mount Vernon Hwy</StreetAddress>
       <City>Mt Vernon</City>
       <State>VA</State>
       <ZipCode>22121</ZipCode>
   </ResidentialInformation>
    <BirthDate>1732-02-22</BirthDate>
</FoundingFathers>

输入XML2

<?xml version="1.0" encoding="UTF-8"?>
<DOC>
   <DOCID>1234</DOCID>
   <FATHERNAME>George Washington</FATHERNAME>
   <RAW_TXT>George Washington lived in Mount Vernon in Mt Vernon, VA. The Washington family had owned land in the area since 1674. The original house was built in 1734 by Washington's father.</RAW_TXT>
   <TXT>
      <S>
         <FATHERNAME>George Washington</FATHERNAME>
         <ESTATENAME>Mount Vernon</>
         <ESTATEPLACE>VA</ESTATEPLACE>
      </S>
      <S>
         <OWNER>Washington family</OWNER>
         <YEAROWNED>1674</YEAROWNED>
      </S>
      <S>
         <BUILTIN>1734</BUILTIN>
         <BUILTBY>Washington's father</BUILTBY>
      </S>
   </TXT>
</DOC>

MergeRecord 配置

Record Reader: XMLReader
Record Writer: JsonRecordSetWriter
Merge Strategy: Bin-Packing Algorithm
Correlation Attribute Name: FatherID
Attribute Strategy: Keep All Unique Attributes
Minimum Number of Records: 2
Maximum Number of Records: 2
Minimum Bin Size: 0 B
Maximum Bin Size: No value set
Max Bin Age: No value set
Maximum Number of Bins: 10

架构

{
  "namespace": "ff",
  "name": "founders",
  "type": "record",
  "fields": [
    {"name":"FatherID", "type": ["string", "null"], "default": null},
    {"name":"FatherName", "type": ["string", "null"], "default": null},
    {"name":"ResidentialInformation", "type": ["null", {
      "name": "ResidentialInformation", "type": "array", "items": {
        "name": "ResidentialInformation", "type": "record", "fields": [
          {"name": "Name", "type": ["string","null"], "default":null},
          {"name": "StreetAddress", "type": ["string","null"], "default":null},
          {"name": "City", "type": ["string","null"], "default":null},
          {"name": "State", "type": ["string","null"], "default":null},
          {"name": "ZipCode", "type": ["string","null"], "default":null}
        ]
      }
    }]},
    {"name":"BirthDate", "type": ["string", "null"], "default": null},
    {"name": "DOCID", "type": ["string", "null"], "default": null},
    {"name": "FINAME", "type": ["string", "null"], "default": null},
    {"name": "CUSTNAME", "type": {"type": "array", "items": "string"}},
    {"name": "RAW_TXT", "type": {"type": "array", "items": "string"}},
    {"name": "TXT", "type": {
      "name": "TXT", "type": "record", "namespace": "txt.sar", "fields": [
        {"name": "S", "type": {
          "type": "array", "items": {
            "name": "RecordInArray", "type": "record", "fields": [
              {"name": "FATHERNAME", "type": {"type": "array", "items": ["string","null"]}},
              {"name": "ESTATENAME", "type": {"type": "array", "items": ["string","null"]}},
              {"name": "ESTATEPLACE", "type": {"type": "array", "items": ["string","null"]}},
              {"name": "OWNER", "type": {"type": "array", "items": ["string","null"]}},
              {"name": "YEAROWNED", "type": {"type": "array", "items": ["string","null"]}},
              {"name": "BUILTIN", "type": {"type": "array", "items": ["string","null"]}},
              {"name": "BUILTBY", "type": {"type": "array", "items": ["string","null"]}}
            ]
          }
        }}
      ]
    }}
  ]}

jolt 规格(移位操作)

{
    "*": {
      "*": "&"
    }
}

实际输出

[ {
  "FatherID" : ["1234", null],
  "FatherName" : ["George Washington", null],
  "ResidentialInformation" : [ {
    "Name" : "Mount Vernon",
    "StreetAddress" : "3200 Mount Vernon Hwy",
    "City" : "Mt Vernon",
    "State" : "VA",
    "ZipCode" : "22121"
  } ],
  "BirthDate" : ["1732-02-22", null],
  "DOCID" : "1234",
  "FATHERNAME" : "George Washington",
  "RAW_TXT" : [ "\nGeorge Washington lived in Mount Vernon in Mt Vernon, VA. The Washington family had owned land in the area since 1674. The original house was built in 1734 by Washington's father.\n" ],
  "TXT" : {
    "S" : [ {
      "FATHERNAME" : [ "George Washington" ],
      "ESTATENAME" : [ "Mount Vernon" ],
      "ESTATEPLACE" : [ "VA" ]
    }, {
      "OWNER" : [ "Washington family" ],
      "YEAROWNED" : [ "1674" ]
    }, {
      "BUILTIN" : [ "1734" ],
      "BUILTBY" : [ "Washington's father" ]
    } ]
  }
} ]

预期输出

[ {
  "FatherID" : "1234",
  "FatherName" : "George Washington",
  "ResidentialInformation" : [ {
    "Name" : "Mount Vernon",
    "StreetAddress" : "3200 Mount Vernon Hwy",
    "City" : "Mt Vernon",
    "State" : "VA",
    "ZipCode" : "22121"
  } ],
  "BirthDate" : "1732-02-22",
  "DOCID" : "1234",
  "FATHERNAME" : "George Washington",
  "RAW_TXT" : [ "\nGeorge Washington lived in Mount Vernon in Mt Vernon, VA. The Washington family had owned land in the area since 1674. The original house was built in 1734 by Washington's father.\n" ],
  "TXT" : {
    "S" : [ {
      "FATHERNAME" : [ "George Washington" ],
      "ESTATENAME" : [ "Mount Vernon" ],
      "ESTATEPLACE" : [ "VA" ]
    }, {
      "OWNER" : [ "Washington family" ],
      "YEAROWNED" : [ "1674" ]
    }, {
      "BUILTIN" : [ "1734" ],
      "BUILTBY" : [ "Washington's father" ]
    } ]
  }
} ]

MergeContent 和 MergeRecord 通常用于合并两个或多个架构相同的流文件,例如将单个 JSON 对象捆绑到一个更大的数组中。

您可以使用 XMLFileLookupService 来使用 LookupRecord,它将获取第二个 XML 文件的内容并将其插入到您选择的位置的第一个流文件的记录中.我不确定的部分是您如何查找 DOCID(以匹配 FatherID),然后 return 顶级 DOC 元素中的每个字段。

如果这不起作用,您可以随时尝试 ExecuteScript 或 InvokeScriptedProcessor(或 ScriptedLookupService)手动执行 "merge"。

EDIT(附加信息):如果 JOLT 规范始终将非空值放在首位(或者数组中的所有元素都为空或相同),您可能能够向您的链添加一个规范,用其数组中的第一个元素替换字段值。