需要帮助推断 NiFi 中 json 文件的 avro 模式

Need Help infering an avro schema for a json file in NiFi

我正在尝试在 NiFi 中创建一个流,该流采用有效的 json 文件并使用 PutHiveStreaming 处理器将其直接放入配置单元 table 中。我的 json 看起来像下面这样:

{
"Raw_Json": {
    "SystemInfo": {
        "Id": "a string ID",
        "TM": null,
        "CountID": "a string ID",
        "Topic": null,
        "AccountID": "some number",
        "StationID": "some number",
        "STime": "some Timestamp",
        "ETime": "some Timestamp"
    },
    "Profile": {
        "ID": "ID number",
        "ProductID": "Some Number",
        "City": "City Name",
        "State": "State Name",
        "Number": "XXX-XXX-XXXX",
        "ExtNumber": null,
        "Unit": null,
        "Name": "Person Name",
        "Service": "Purchase",
        "AddrID": "00000000",
        "Products": {
            "Product": [{
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"
            }]
        }
    },
    "Total": {
        "Amount": "some amount",
        "Delivery": "some address",
        "Estimate": "some amount",
        "Tax": null,
        "Delivery_Type": null

    }

},
"partition_date":"2017-05-19"

}

我正在使用 InferAvroSchema 处理器获取 json,然后使用推断的 avro 模式将 json 转换为 avro 格式并将其发送到 PutHiveStreaming 处理器。我的流程看起来像这样:

主要目标是我希望将所有 "Raw_Json" 列转储到配置单元 table 中的一列中,并且 table 将由 "partition_date" 列,这将是 table 的第二列。问题是,出于某种原因,NiFi 在从 "Raw_Json" 列推断嵌套的 json 时遇到问题,并将其像 Null 一样转储到 table 上,如下所示:

有谁知道如何让 NiFi 将 "Raw_Json" 列的整个嵌套 Json 作为一列读取并将其发送到配置单元 table?我怎样才能为它创建自己的 avro 模式来做到这一点?非常感谢任何有关如何解决此问题的见解或想法!

通常,只要您的输入文件格式假定始终相同,您只需创建或生成(推断)avro 模式一次 - 两个字段 Raw_Jsonpartition_date.

你应该在文件中有这样的东西,例如 avro-schema.json:

{
  "type" : "record",
  "name" : "test",
  "fields" : [ {
    "name" : "Raw_Json",
    "type" : 
    ...
  }, {
    "name" : "partition_date",
    "type" : "string",
    "doc" : "Type inferred from '\"2017-05-19\"'"
  } ]
}

并将此文件用作 ConvertJSONToAvro 处理器中的 Record Schema

列的类型 Raw_Json:

或者您必须使用所有嵌套字段、数组等完全定义复杂数据类型

或者如果你想将 Raw_Json 的内容写入字符串列,那么你必须在将文件转换为 avro 之前将其转换为字符串。您可以使用 EvaluateJsonPathAttributesToJson 处理器的顺序。