Nifi中如何查看Kafka的消费消息?
How do I view the consumed messages of Kafka in Nifi?
我已经启动了一个 Nifi 进程(使用 Kafka)并将其连接到一个主题。它是 运行,但我无法(不知道)在哪里可以查看消息?
您可能需要使用消息 --from-beginning
如果之前已经使用过这些消息(因此已经提交了偏移量)。
在 GetKafka 处理器上,有一个 属性 Auto Offset Reset 应该设置为 smallest 这是等效的Kafka 控制台消费者中的 --from-beginning
个。
ConsumeKafka 处理器运行并为每条消息生成流文件。只有当您将处理器连接到其他组件(如另一个处理器或输出端口)时,您才能可视化正在移动的数据。
对于初学者,您可以试试这个:
- 将
ConsumeKafka
连接到 LogAttribute
或任何其他处理器
那很重要。
- 停止或禁用 [=11=] 处理器。
- 现在什么时候
您开始
ConsumeKafka
,所有收到的消息来自
配置好的Kafka topic会以flowfiles的形式排队。
- 右键单击流文件排队的关系,然后
点击
List Queue
即可进入队列。
- 点击任意项目
队列,将出现一个上下文菜单。单击
View
按钮,您
可以看到数据。
整个"viewing" Kafka消息的解释只是为了帮助你调试和开始使用NiFi。理想情况下,您将使用其他 NiFi 处理器来处理您的用例。
示例
您收到来自 Kafka 的消息并希望将其写入 MongoDB,因此您可以将流程设置为:
注意:
有基于记录的处理器,如 ConsumeKafkaRecord
和 PutMongoRecord
,但它们基本上做同样的事情,但有更多增强。由于您是新手,我建议了一个简单的流程。您可以找到有关基于 Record
的处理器 here 的详细信息并尝试一下。
我已经启动了一个 Nifi 进程(使用 Kafka)并将其连接到一个主题。它是 运行,但我无法(不知道)在哪里可以查看消息?
您可能需要使用消息 --from-beginning
如果之前已经使用过这些消息(因此已经提交了偏移量)。
在 GetKafka 处理器上,有一个 属性 Auto Offset Reset 应该设置为 smallest 这是等效的Kafka 控制台消费者中的 --from-beginning
个。
ConsumeKafka 处理器运行并为每条消息生成流文件。只有当您将处理器连接到其他组件(如另一个处理器或输出端口)时,您才能可视化正在移动的数据。
对于初学者,您可以试试这个:
- 将
ConsumeKafka
连接到LogAttribute
或任何其他处理器 那很重要。 - 停止或禁用 [=11=] 处理器。
- 现在什么时候
您开始
ConsumeKafka
,所有收到的消息来自 配置好的Kafka topic会以flowfiles的形式排队。 - 右键单击流文件排队的关系,然后
点击
List Queue
即可进入队列。 - 点击任意项目
队列,将出现一个上下文菜单。单击
View
按钮,您 可以看到数据。
整个"viewing" Kafka消息的解释只是为了帮助你调试和开始使用NiFi。理想情况下,您将使用其他 NiFi 处理器来处理您的用例。
示例
您收到来自 Kafka 的消息并希望将其写入 MongoDB,因此您可以将流程设置为:
ConsumeKafkaRecord
和 PutMongoRecord
,但它们基本上做同样的事情,但有更多增强。由于您是新手,我建议了一个简单的流程。您可以找到有关基于 Record
的处理器 here 的详细信息并尝试一下。