如何在 NiFi 中正确解析嵌套的 Avro 记录?
How does one parse nested Avro records correctly in NiFi?
我有大致遵循以下格式的传入 Avro 记录。我能够阅读它们并将它们转换为现有的 NiFi 流程。但是,最近的更改要求我从这些文件中读取并解析嵌套记录,在此示例中为 employers
。我阅读了 Apache NiFi 博客 post、Record-Oriented Data with NiFi
但无法弄清楚如何让 AvroRecordReader 解析嵌套记录。
{
"name": "recordFormatName",
"namespace": "nifi.examples",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "gender", "type": "string" },
{ "name": "employers",
"type": "record",
"fields": [
{"name": "company", "type": "string"},
{"name": "guid", "type": "string"},
{"name": "streetaddress", "type": "string"},
{"name": "city", "type": "string"}
]}
]
}
我希望实现的是为每个 recordFormatName
记录读取 employers
记录并使用 PutDatabaseRecord 处理器跟踪看到的 employers
值的流程。目前的计划是将记录插入 MySQL 数据库。正如下面的答案所建议的,我计划使用 PartitionRecord 根据 employers
子记录中的值对记录进行排序。我不需要此特定流程的顶级详细信息。
我已尝试使用 AvroRecordReader 进行解析,但无法弄清楚如何指定嵌套记录。这是否可以单独使用 AvroRecordReader 或进行预处理来完成,比如说需要先进行 JOLT 转换?
编辑:在收到响应后添加了有关数据库的更多详细信息。
您的目标数据库是什么?您的目标 table 是什么样的? PutDatabaseRecord 可能无法处理嵌套记录,除非您的数据库、驱动程序和目标 table 支持它们。
或者,您可能需要使用 UpdateRecord to flatten the "employers" object into fields at the top level of the record. This is a manual process (until NIFI-4398 is implemented), but you only have 4 fields. After flattening the records, you could use PartitionRecord 来获取具有特定值(例如 employers.company)的所有记录。从 PartitionRecord 传出的流文件在技术上将构成分区字段的不同值。我不确定你在用不同的价值观做什么,但如果你能详细说明,我很乐意提供帮助。
我有大致遵循以下格式的传入 Avro 记录。我能够阅读它们并将它们转换为现有的 NiFi 流程。但是,最近的更改要求我从这些文件中读取并解析嵌套记录,在此示例中为 employers
。我阅读了 Apache NiFi 博客 post、Record-Oriented Data with NiFi
但无法弄清楚如何让 AvroRecordReader 解析嵌套记录。
{
"name": "recordFormatName",
"namespace": "nifi.examples",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "gender", "type": "string" },
{ "name": "employers",
"type": "record",
"fields": [
{"name": "company", "type": "string"},
{"name": "guid", "type": "string"},
{"name": "streetaddress", "type": "string"},
{"name": "city", "type": "string"}
]}
]
}
我希望实现的是为每个 recordFormatName
记录读取 employers
记录并使用 PutDatabaseRecord 处理器跟踪看到的 employers
值的流程。目前的计划是将记录插入 MySQL 数据库。正如下面的答案所建议的,我计划使用 PartitionRecord 根据 employers
子记录中的值对记录进行排序。我不需要此特定流程的顶级详细信息。
我已尝试使用 AvroRecordReader 进行解析,但无法弄清楚如何指定嵌套记录。这是否可以单独使用 AvroRecordReader 或进行预处理来完成,比如说需要先进行 JOLT 转换?
编辑:在收到响应后添加了有关数据库的更多详细信息。
您的目标数据库是什么?您的目标 table 是什么样的? PutDatabaseRecord 可能无法处理嵌套记录,除非您的数据库、驱动程序和目标 table 支持它们。
或者,您可能需要使用 UpdateRecord to flatten the "employers" object into fields at the top level of the record. This is a manual process (until NIFI-4398 is implemented), but you only have 4 fields. After flattening the records, you could use PartitionRecord 来获取具有特定值(例如 employers.company)的所有记录。从 PartitionRecord 传出的流文件在技术上将构成分区字段的不同值。我不确定你在用不同的价值观做什么,但如果你能详细说明,我很乐意提供帮助。