来自 KafkaConsumer 的 NiFi Flowfile 属性

NiFi Flowfile Attributes from KafkaConsumer

我一直在尝试从 Spark Streaming 中的 Kafka 消息访问 NiFi Flowfile 属性。我正在使用 Java 作为语言。

场景是 NiFI 使用 GetSFTP 处理器从 FTP 位置读取二进制文件,并使用 publishKafka 处理器将 byte[] 消息发布到 Kafka。使用 Spark Streaming 作业将这些 byte[] 属性转换为 ASCII 数据,并将这些解码后的 ASCII 写入 Kafka 以进行进一步处理,并使用 NiFi 处理器保存到 HDFS。

我的问题是我无法跟踪二进制文件名和解码后的 ASCII 文件。我必须在解码后的 ASCII 中添加一个 header 部分(用于文件名、文件大小、记录计数等),但我无法弄清楚如何从 KafkaConsumer object 访问 NiFi Flowfile 中的文件名。有没有办法使用标准的 NiFi 处理器来做到这一点?或者请分享任何其他建议来实现此功能。谢谢。

所以你的数据流是:

FTP -> NiFi -> Kafka -> Spark Streaming -> Kafka -> NiFi -> HDFS ?

目前Kafka在每条消息上没有元数据属性(虽然我相信这可能会在Kafka 0.11中出现),所以当NiFi向主题发布消息时,它目前无法传递流文件属性与消息。

您必须构建某种类型的包装数据格式(可能是 JSON 或 Avro),其中包含原始内容 + 您需要的附加属性,以便您可以将整个内容发布为给 Kafka 的一条消息。

此外,我不确切知道您在 Spark 流式传输作业中做了什么,但是您有什么理由不能在 NiFi 中只做那部分吗?它听起来不像涉及窗口或连接的任何复杂内容,因此您可以稍微简化一些事情并让 NiFi 进行解码,然后让 NiFi 将其写入 Kafka 并写入 HDFS。