Nifi RecordReader & RecordWriter 序列化错误。非法类型转换异常;无法转换 class 的值;因为不支持该类型

Nifi RecordReader & RecordWriter serialization error. IllegalTypeConversionException; Cannot convert value of class; because the type is not supported

我正在尝试使用 Apache Nifi 中的 JoltTransformRecord 将 json 转换为 json。当我尝试在 https://jolt-demo.appspot.com/ 中转换 json 时,我得到了正确的结果。这没关系。

但是,当我尝试使用 JoltTransformRecord 转换 json 时,它会抛出异常。错误是; "Cannot convert value of class [Ljava.lang.Object; because the type is not supported"。但我不明白为什么我会收到此错误。我确实检查了我的输入和输出模式,但我没有找到任何东西。他们看起来是正确的。

下面给出了我的输入和输出 json 示例、颠簸规格、输入和输出模式。此外,为此,我正在使用 JsonTreeReaderJsonRecordSetWriter

--- 我该如何解决这个问题? ---

示例输入 json for JoltTransformRecord(在这个例子中,数组中只有一个 json 对象。但实际上,有数组中有很多 json 个对象。);

[ {
  "uuid" : "MFMS1-MC5",
  "componentId" : "path1",
  "Samples" : {
    "PathFeedrate" : [ {
      "dataItemId" : "pf",
      "timestamp" : "2019-03-01T21:48:27.940558Z",
      "sequence" : "68104",
      "value" : "425.5333",
      "name" : "Fact",
      "subType" : "ACTUAL"
    }, {
      "dataItemId" : "pf",
      "timestamp" : "2019-03-01T21:48:30.244219Z",
      "sequence" : "68117",
      "value" : "0",
      "name" : "Fact",
      "subType" : "ACTUAL"
    } ]
  },
  "Events" : {
    "SequenceNumber" : [ {
      "dataItemId" : "seq",
      "timestamp" : "2019-03-01T21:48:27.940558Z",
      "sequence" : "68105",
      "value" : "0",
      "name" : "sequenceNum"
    } ],
    "Unit" : [ {
      "dataItemId" : "unit",
      "timestamp" : "2019-03-01T21:48:27.940558Z",
      "sequence" : "68106",
      "value" : "13",
      "name" : "unitNum"
    } ]
  }
}]

示例输出Json我想要;

{
  "DataItems" : [ {
    "uuid" : "MFMS1-MC5",
    "componentId" : "path1",
    "eventType" : "Samples",
    "type" : "PathFeedrate",
    "dataItemId" : "pf",
    "timestamp" : "2019-03-01T21:48:27.940558Z",
    "sequence" : "68104",
    "value" : "425.5333",
    "name" : "Fact",
    "subType" : "ACTUAL"
  }, {
    "uuid" : "MFMS1-MC5",
    "componentId" : "path1",
    "eventType" : "Samples",
    "type" : "PathFeedrate",
    "dataItemId" : "pf",
    "timestamp" : "2019-03-01T21:48:30.244219Z",
    "sequence" : "68117",
    "value" : "0",
    "name" : "Fact",
    "subType" : "ACTUAL"
  }, {
    "uuid" : "MFMS1-MC5",
    "componentId" : "path1",
    "eventType" : "Events",
    "type" : "SequenceNumber",
    "dataItemId" : "seq",
    "timestamp" : "2019-03-01T21:48:27.940558Z",
    "sequence" : "68105",
    "value" : "0",
    "name" : "sequenceNum"
  }, {
    "uuid" : "MFMS1-MC5",
    "componentId" : "path1",
    "eventType" : "Events",
    "type" : "Unit",
    "dataItemId" : "unit",
    "timestamp" : "2019-03-01T21:48:27.940558Z",
    "sequence" : "68106",
    "value" : "13",
    "name" : "unitNum"
  } ]
}

我的 Jolt 规格;

[
  {
    "operation": "shift",
    "spec": {
      "Samples": {
        "*": {
          "*": {
            "@(3,uuid)": "Items.&2[#2].uuid",
            "@(3,componentId)": "Items.&2[#2].componentId",
            "": "Items.&2[#2].eventType",
            "": "Items.&2[#2].type",
            "*": "Items.&2[#2].&"
          }
        }
      },
      "Events": {
        "*": {
          "*": {
            "@(3,uuid)": "Items.&2[#2].uuid",
            "@(3,componentId)": "Items.&2[#2].componentId",
            "": "Items.&2[#2].eventType",
            "": "Items.&2[#2].type",
            "*": "Items.&2[#2].&"
          }
        }
      },
      "Condition": {
        "*": {
          "*": {
            "@(3,uuid)": "Items.&2[#2].uuid",
            "@(3,componentId)": "Items.&2[#2].componentId",
            "": "Items.&2[#2].eventType",
            "": "Items.&2[#2].value",
            "*": "Items.&2[#2].&"
          }
        }
      }
    }
  },
  {
    "operation": "shift",
    "spec": {
      "Items": {
        "*": {
          "*": "DataItems[]"
        }
      }
    }
  }
]

此规范工作正常。因为我在Jolt transform demo中已经试过了。

我正在使用 JsonTreeReaderJoltTransformRecord 中读取 json。这是我的 输入模式 ;

{
    "name": "Items",
    "namespace": "Items",
    "type": "record",
    "fields": [
        {
            "name": "uuid",
            "type": "string"
        },
        {
            "name": "componentId",
            "type": "string"
        },
        {
            "name": "Samples",
            "type": ["null", {
                "type": "map",
                "values": {
                    "type": "array",
                    "items": {
                        "name": "SamplesDataItem",
                        "type": "record",
                        "fields": [
                            {
                                "name": "dataItemId",
                                "type": "string"
                            },
                            {
                                "name": "timestamp",
                                "type": "string"
                            },
                            {
                                "name": "sequence",
                                "type": "string"
                            },
                            {
                                "name": "value",
                                "type": "string"
                            },
                            {
                                "name": "name",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "subType",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "sampleRate",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "statistic",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "duration",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "sampleCount",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "compositionId",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "resetTriggered",
                                "type": ["null", "string"]
                            }
                        ]
                    }
                }
            }]
        },
        {
            "name": "Events",
            "type": ["null", {
                "type": "map",
                "values": {
                    "type": "array",
                    "items": {
                        "name": "EventsDataItem",
                        "type": "record",
                        "fields": [
                            {
                                "name": "dataItemId",
                                "type": "string"
                            },
                            {
                                "name": "timestamp",
                                "type": "string"
                            },
                            {
                                "name": "sequence",
                                "type": "string"
                            },
                            {
                                "name": "value",
                                "type": "string"
                            },
                            {
                                "name": "name",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "subType",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "compositionId",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "resetTriggered",
                                "type": ["null", "string"]
                            }
                        ]
                    }
                }
            }]
        },
        {
            "name": "Condition",
            "type": ["null", {
                "type": "map",
                "values": {
                    "type": "array",
                    "items": {
                        "name": "ConditionDataItem",
                        "type": "record",
                        "fields": [
                            {
                                "name": "dataItemId",
                                "type": "string"
                            },
                            {
                                "name": "timestamp",
                                "type": "string"
                            },
                            {
                                "name": "type",
                                "type": "string"
                            },
                            {
                                "name": "sequence",
                                "type": "string"
                            },
                            {
                                "name": "name",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "subType",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "nativeCode",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "nativeSeverity",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "qualifier",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "statistic",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "compositionId",
                                "type": ["null", "string"]
                            }
                        ]
                    }
                }
            }]
        }
    ]
}

我正在使用 JsonRecordSetWriter 将转换后的结果写入 JoltTransformRecord。这是我的 输出模式 ;

{
    "name": "Items",
    "type": "record",
    "namespace": "Items",
    "fields": [
        {
            "name": "DataItems",
            "type": {
                "type": "array",
                "items": {
                    "name": "DataItems",
                    "type": "record",
                    "fields": [
                        {
                            "name": "uuid",
                            "type": "string"
                        },
                        {
                            "name": "componentId",
                            "type": "string"
                        },
                        {
                            "name": "eventType",
                            "type": "string"
                        },
                        {
                            "name": "type",
                            "type": "string"
                        },
                        {
                            "name": "dataItemId",
                            "type": "string"
                        },
                        {
                            "name": "timestamp",
                            "type": "string"
                        },
                        {
                            "name": "value",
                            "type": "string"
                        },
                        {
                            "name": "name",
                            "type": ["null", "string"],
                            "default": null
                        },
                        {
                            "name": "subType",
                            "type": ["null", "string"],
                            "default": null
                        }
                    ]
                }
            }
        }
    ]
} 

这确实是记录处理实用程序中的错误,我已编写 NIFI-6105 来解决此问题。抓得好!

作为解决方法,由于您有 JSON 作为输入和输出,您可以使用 JoltTransformJson 而不是 JoltTransformRecord。或者,如果您知道地图中的键(例如 PathFeedrate),您可以更改模式以将其视为记录而不是地图,这可能会让您绕过错误。