在 NIFI PutS3Object 处理器中访问 FlowFile 内容

Accessing FlowFile content in NIFI PutS3Object Processor

我是 NIFI 新手,想将数据从 Kafka 推送到 S3 存储桶。我正在使用 PutS3Object 处理器,如果我将 Bucket 值硬编码为 mphdf/orderEvent,则可以将数据推送到 S3,但我想根据 FlowFile 内容中的字段指定存储桶,即在 Json。那么,如果 Json 内容是 {"menu": {"type": "file","value": "File"}},我可以将 Bucket 属性 的值设为 mphdf/$.menu.type 吗?我试图这样做并得到以下错误。我想知道是否有办法使用 PutS3Object 处理器访问 FlowFile 内容并使 Bucket 名称可配置,或者我是否必须构建自己的处理器?

ERROR [Timer-Driven Process Thread-10]
o.a.nifi.processors.aws.s3.PutS3Object
com.amazonaws.services.s3.model.AmazonS3Exception: The XML you
provided was not well-formed or did not validate against our
published schema (Service: Amazon S3; Status Code: 400; Error Code:
MalformedXML; Request ID: 77DF07828CBA0E5F)

我相信您想要做的是使用 EvaluateJSONPath 处理器,它根据 JSON 内容评估任意 JSONPath 表达式并将结果提取到流文件属性。然后,您可以在 PutS3Object 配置中使用 NiFi 表达式语言引用流文件属性(请参阅您的第一个 属性 Object Key,它引用了 ${filename})。这样,您将评估 $.menu.type 并将其存储到 EvaluateJSONPath 处理器中的属性 menuType 中,然后在 PutS3Object 中您将拥有 mphdf/${menuType}

您可能需要尝试一下,但我认为这应该可行。