Apache Nifi:使用 UpdateRecord 处理器解析数据

Apache Nifi: Parse data with UpdateRecord Processor

我正在尝试使用 UpdateRecord 处理器解析 Nifi (1.7.1) 中的一些数据。 原始数据是 json 文件,我想根据模式将其转换为 Avro。 Avro 转换没问题,但在该转换中我还需要将一个数组元素从 json 数据解析为 Avro 中的不同结构。 这是输入 json:

的示例数据
{  "geometry" : {
"coordinates" : [ [ 4.963087975800593, 45.76365595859971 ], [ 4.962874487781098, 45.76320922779652 ], [ 4.962815443439148, 45.763116079159374 ], [ 4.962744732112515, 45.763010484202866 ], [ 4.962096825239138, 45.762112721939246 ] ]}  ...}

作为其架构(在 RecordReader 中指定):

{  "type": "record",
  "name": "features",
  "fields": [
    {
      "name": "geometry",
      "type": {
        "type": "record",
        "name": "geometry",
        "fields": [
          {
            "name": "coordinatesJson",
            "type": {
              "type": "array",
              "items": {
                "type": "array",
                "items": "double"
              }
            }
          },
        ]
      }
    },
    ....
  ]
} 

如你所见,坐标是一个数组的数组。

我需要根据此架构(在 RecordWriter 中指定)将这些数据解析为 Avro:

{
  "name": "outputdata",
  "type": "record",
  "fields": [
    {"name": "coordinatesAvro",
      "type": {
        "type": "array",
        "items" : {
        "type" : "record",
        "name" : "coordinatesAvro",
        "fields" : [ {
          "name" : "X",
          "type" : "double"
        }, {
          "name" : "Y",
          "type" : "double"
        } ]
      }
      }
    },
    .....

  ]
}   

这里的问题是我无法使用 RecordPath 函数从 coordinatesJson 解析到 coordinatesAvro 我尝试了几种映射,例如:

Property:                            Value:
/coordinatesJson[0..-1]/X            /geometry/coordinatesAvro[*][0]
/coordinatesJson[0..-1]/Y            /geometry/coordinatesAvro[*][1]

这应该是一个非常简单的解析步骤,但正如我所说,我一直在兜圈子以实现这一目标。

任何帮助将不胜感激。

当我遇到类似的东西时,我接下来会做: 1) 通过 ExecuteScript 处理器将 Json 转换为 Json 我需要的结构(例如在你的情况下:coordinatesAvro)。我使用了 ECMAScript,因为您可以简单地解析 JSON 并使用对象(转换它们)。 2) ConvertJsonToAvro 使用一种通用模式(在您的情况下为 coordinatesAvro)用于 Reader 和 Writer。 它工作得很好,我已经在 BigData 案例中使用过它。这是您的问题的可能解决方案之一。