Nifi中如何查看Kafka的消费消息?

How do I view the consumed messages of Kafka in Nifi?

我已经启动了一个 Nifi 进程(使用 Kafka)并将其连接到一个主题。它是 运行,但我无法(不知道)在哪里可以查看消息?

您可能需要使用消息 --from-beginning 如果之前已经使用过这些消息(因此已经提交了偏移量)。

在 GetKafka 处理器上,有一个 属性 A​​uto Offset Reset 应该设置为 smallest 这是等效的Kafka 控制台消费者中的 --from-beginning 个。

ConsumeKafka 处理器运行并为每条消息生成流文件。只有当您将处理器连接到其他组件(如另一个处理器或输出端口)时,您才能可视化正在移动的数据。

对于初学者,您可以试试这个:

  • ConsumeKafka 连接到 LogAttribute 或任何其他处理器 那很重要。
  • 停止或禁用 [​​=11=] 处理器。
  • 现在什么时候 您开始 ConsumeKafka,所有收到的消息来自 配置好的Kafka topic会以flowfiles的形式排队。
  • 右键单击流文件排队的关系,然后 点击List Queue即可进入队列。
  • 点击任意项目 队列,将出现一个上下文菜单。单击 View 按钮,您 可以看到数据。

整个"viewing" Kafka消息的解释只是为了帮助你调试和开始使用NiFi。理想情况下,您将使用其他 NiFi 处理器来处理您的用例。

示例

您收到来自 Kafka 的消息并希望将其写入 MongoDB,因此您可以将流程设置为: 注意: 有基于记录的处理器,如 ConsumeKafkaRecordPutMongoRecord,但它们基本上做同样的事情,但有更多增强。由于您是新手,我建议了一个简单的流程。您可以找到有关基于 Record 的处理器 here 的详细信息并尝试一下。