Nifi MergeRecord Processor 合并空值

Nifi MergeRecord Processor to merge null values

我正在拆分字段列表并尝试在最后合并它们。我有两种字段,标准字段和自定义字段。我处理自定义字段的方式与标准字段不同。

{
 "standardfield1" : "fieldValue1",
  "customField1" : "customValue"
}

这些必须翻译成

{ 
  "standardfield1" : "fieldValue1",
  "customFields" : [
   { "type" : "customfield",
     "id" : 1212 //this is id of the customField1, retrieved at run time
     "value" :  "customValue"
   } ]
}

我的 mergeRecord 架构设置为

{
  "name": "custom field",
  "namespace": "nifi",
  "type": "record",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "type", "type": "string" },
    { "name": "value", "type": "string" }

  ]
}

并且根据我的需要,我将标准字段的内容设置为新的流文件属性,因为我可以从中提取它,并将空值放入流文件内容中。

因此,自定义字段和标准字段都连接到 mergeRecord 处理器。

只要有效负载中有自定义字段,它就可以很好地工作。如果只有标准字段而没有自定义字段,那么 mergeRecord 处理器不会合并任何东西,也不会失败,它只会抛出 NullPointerException 并且 flowfile 永远卡在队列中。

我想让 mergeRecord 处理器合并空的内容流文件。

如有任何帮助,我们将不胜感激

我不确定我是否完全理解您的用例,但是对于您在上面的输入,如果您将 extracted/populated customField1 的 ID 放入属性(我们称之为 myId)中,那么您可以使用 JoltTransformJSON 使用此 Chain 规范获得上面所需的输出:

[
  {
    "operation": "shift",
    "spec": {
      "standardfield1": "standardfield1",
      "customField*": {
        "@": "customFields.[&(1,1)].value",
        "#customfield": "customFields.[&(1,1)].type",
        "#${myId}": "customFields.[&(1,1)].id"
      }
    }
  },
  {
    "operation": "remove",
    "spec": {
      "customFields": {
        "0": ""
      }
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      "customFields": {
        "*": {
          "id": "=toInteger"
        }
      }
    }
  }
]

如果存在 customField,这将创建 customFields 数组,并使用上面的值(包括 myId 属性的值)填充它。如果你愿意,你可以调整一些东西(比如向上面的链添加一个默认规范)来为 customFields 添加一个空数组(例如,为了保持模式快乐)。 如果我误解了你的意思,请告诉我,我会尽力提供帮助。