来自 KafkaConsumer 的 NiFi Flowfile 属性
NiFi Flowfile Attributes from KafkaConsumer
我一直在尝试从 Spark Streaming 中的 Kafka 消息访问 NiFi Flowfile 属性。我正在使用 Java 作为语言。
场景是 NiFI 使用 GetSFTP 处理器从 FTP 位置读取二进制文件,并使用 publishKafka 处理器将 byte[] 消息发布到 Kafka。使用 Spark Streaming 作业将这些 byte[] 属性转换为 ASCII 数据,并将这些解码后的 ASCII 写入 Kafka 以进行进一步处理,并使用 NiFi 处理器保存到 HDFS。
我的问题是我无法跟踪二进制文件名和解码后的 ASCII 文件。我必须在解码后的 ASCII 中添加一个 header 部分(用于文件名、文件大小、记录计数等),但我无法弄清楚如何从 KafkaConsumer object 访问 NiFi Flowfile 中的文件名。有没有办法使用标准的 NiFi 处理器来做到这一点?或者请分享任何其他建议来实现此功能。谢谢。
所以你的数据流是:
FTP -> NiFi -> Kafka -> Spark Streaming -> Kafka -> NiFi -> HDFS
?
目前Kafka在每条消息上没有元数据属性(虽然我相信这可能会在Kafka 0.11中出现),所以当NiFi向主题发布消息时,它目前无法传递流文件属性与消息。
您必须构建某种类型的包装数据格式(可能是 JSON 或 Avro),其中包含原始内容 + 您需要的附加属性,以便您可以将整个内容发布为给 Kafka 的一条消息。
此外,我不确切知道您在 Spark 流式传输作业中做了什么,但是您有什么理由不能在 NiFi 中只做那部分吗?它听起来不像涉及窗口或连接的任何复杂内容,因此您可以稍微简化一些事情并让 NiFi 进行解码,然后让 NiFi 将其写入 Kafka 并写入 HDFS。
我一直在尝试从 Spark Streaming 中的 Kafka 消息访问 NiFi Flowfile 属性。我正在使用 Java 作为语言。
场景是 NiFI 使用 GetSFTP 处理器从 FTP 位置读取二进制文件,并使用 publishKafka 处理器将 byte[] 消息发布到 Kafka。使用 Spark Streaming 作业将这些 byte[] 属性转换为 ASCII 数据,并将这些解码后的 ASCII 写入 Kafka 以进行进一步处理,并使用 NiFi 处理器保存到 HDFS。
我的问题是我无法跟踪二进制文件名和解码后的 ASCII 文件。我必须在解码后的 ASCII 中添加一个 header 部分(用于文件名、文件大小、记录计数等),但我无法弄清楚如何从 KafkaConsumer object 访问 NiFi Flowfile 中的文件名。有没有办法使用标准的 NiFi 处理器来做到这一点?或者请分享任何其他建议来实现此功能。谢谢。
所以你的数据流是:
FTP -> NiFi -> Kafka -> Spark Streaming -> Kafka -> NiFi -> HDFS ?
目前Kafka在每条消息上没有元数据属性(虽然我相信这可能会在Kafka 0.11中出现),所以当NiFi向主题发布消息时,它目前无法传递流文件属性与消息。
您必须构建某种类型的包装数据格式(可能是 JSON 或 Avro),其中包含原始内容 + 您需要的附加属性,以便您可以将整个内容发布为给 Kafka 的一条消息。
此外,我不确切知道您在 Spark 流式传输作业中做了什么,但是您有什么理由不能在 NiFi 中只做那部分吗?它听起来不像涉及窗口或连接的任何复杂内容,因此您可以稍微简化一些事情并让 NiFi 进行解码,然后让 NiFi 将其写入 Kafka 并写入 HDFS。