Nifi MergeRecord Processor 合并空值
Nifi MergeRecord Processor to merge null values
我正在拆分字段列表并尝试在最后合并它们。我有两种字段,标准字段和自定义字段。我处理自定义字段的方式与标准字段不同。
{
"standardfield1" : "fieldValue1",
"customField1" : "customValue"
}
这些必须翻译成
{
"standardfield1" : "fieldValue1",
"customFields" : [
{ "type" : "customfield",
"id" : 1212 //this is id of the customField1, retrieved at run time
"value" : "customValue"
} ]
}
我的 mergeRecord 架构设置为
{
"name": "custom field",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "type", "type": "string" },
{ "name": "value", "type": "string" }
]
}
并且根据我的需要,我将标准字段的内容设置为新的流文件属性,因为我可以从中提取它,并将空值放入流文件内容中。
因此,自定义字段和标准字段都连接到 mergeRecord 处理器。
只要有效负载中有自定义字段,它就可以很好地工作。如果只有标准字段而没有自定义字段,那么 mergeRecord 处理器不会合并任何东西,也不会失败,它只会抛出 NullPointerException 并且 flowfile 永远卡在队列中。
我想让 mergeRecord 处理器合并空的内容流文件。
如有任何帮助,我们将不胜感激
我不确定我是否完全理解您的用例,但是对于您在上面的输入,如果您将 extracted/populated customField1 的 ID 放入属性(我们称之为 myId)中,那么您可以使用 JoltTransformJSON 使用此 Chain 规范获得上面所需的输出:
[
{
"operation": "shift",
"spec": {
"standardfield1": "standardfield1",
"customField*": {
"@": "customFields.[&(1,1)].value",
"#customfield": "customFields.[&(1,1)].type",
"#${myId}": "customFields.[&(1,1)].id"
}
}
},
{
"operation": "remove",
"spec": {
"customFields": {
"0": ""
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"customFields": {
"*": {
"id": "=toInteger"
}
}
}
}
]
如果存在 customField,这将创建 customFields 数组,并使用上面的值(包括 myId 属性的值)填充它。如果你愿意,你可以调整一些东西(比如向上面的链添加一个默认规范)来为 customFields 添加一个空数组(例如,为了保持模式快乐)。
如果我误解了你的意思,请告诉我,我会尽力提供帮助。
我正在拆分字段列表并尝试在最后合并它们。我有两种字段,标准字段和自定义字段。我处理自定义字段的方式与标准字段不同。
{
"standardfield1" : "fieldValue1",
"customField1" : "customValue"
}
这些必须翻译成
{
"standardfield1" : "fieldValue1",
"customFields" : [
{ "type" : "customfield",
"id" : 1212 //this is id of the customField1, retrieved at run time
"value" : "customValue"
} ]
}
我的 mergeRecord 架构设置为
{
"name": "custom field",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "type", "type": "string" },
{ "name": "value", "type": "string" }
]
}
并且根据我的需要,我将标准字段的内容设置为新的流文件属性,因为我可以从中提取它,并将空值放入流文件内容中。
因此,自定义字段和标准字段都连接到 mergeRecord 处理器。
只要有效负载中有自定义字段,它就可以很好地工作。如果只有标准字段而没有自定义字段,那么 mergeRecord 处理器不会合并任何东西,也不会失败,它只会抛出 NullPointerException 并且 flowfile 永远卡在队列中。
我想让 mergeRecord 处理器合并空的内容流文件。
如有任何帮助,我们将不胜感激
我不确定我是否完全理解您的用例,但是对于您在上面的输入,如果您将 extracted/populated customField1 的 ID 放入属性(我们称之为 myId)中,那么您可以使用 JoltTransformJSON 使用此 Chain 规范获得上面所需的输出:
[
{
"operation": "shift",
"spec": {
"standardfield1": "standardfield1",
"customField*": {
"@": "customFields.[&(1,1)].value",
"#customfield": "customFields.[&(1,1)].type",
"#${myId}": "customFields.[&(1,1)].id"
}
}
},
{
"operation": "remove",
"spec": {
"customFields": {
"0": ""
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"customFields": {
"*": {
"id": "=toInteger"
}
}
}
}
]
如果存在 customField,这将创建 customFields 数组,并使用上面的值(包括 myId 属性的值)填充它。如果你愿意,你可以调整一些东西(比如向上面的链添加一个默认规范)来为 customFields 添加一个空数组(例如,为了保持模式快乐)。 如果我误解了你的意思,请告诉我,我会尽力提供帮助。