即使未能 post 在 Kafka Streams 中输出主题,消费者偏移量是否已提交?
Is consumer offset commited even when failing to post to output topic in Kafka Streams?
如果我有一个 Kafka 流应用程序未能 post 到一个主题(因为该主题不存在),它会提交消费者偏移量并继续,还是会在同一消息上循环直到它可以解决输出主题吗?根据我的观察,该应用程序仅打印错误并运行良好。
尝试 post 主题时的错误示例:
Error while fetching metadata with correlation id 80 : {super.cool.test.topic=UNKNOWN_TOPIC_OR_PARTITION}
在我看来,为了不丢失数据,它只会旋转同一封邮件直到问题得到解决?我找不到关于默认行为是什么的明确答案。我们还没有将自动提交设置为关闭或类似的设置,大部分设置都设置为默认值。
我问是因为我们不想在健康检查正常的情况下结束(应用程序 运行 打印错误记录)而我们只是扔掉大量的 Kafka 消息.
Kafka Streams 不会为这种情况提交偏移量,因为它提供了 at-least-once 处理保证(事实上,甚至不可能以不同的方式重新配置 Kafka Streams——只有更强的 exactly-once 保证是可能的)。此外,Kafka Streams 始终在消费者上禁用 auto-commit(并且不允许您启用它),因为 Kafka Streams 自行管理提交偏移量。
如果您 运行 使用默认设置,生产者实际上应该抛出一个异常并且相应的线程应该死亡——如果线程死亡,您可以通过注册 KafkaStreams#uncaughtExceptionHandler()
获得回调。
您还可以观察 KafkaStreams#state()
(或注册回调 KafkaStreams#setStateListener()
)。如果所有线程都死了,状态将转到 DEAD
(注意,旧版本中存在一个错误,在这种情况下状态仍然是 RUNNING
:https://issues.apache.org/jira/browse/KAFKA-5372)
因此,应用程序不应处于健康状态,Kafka Streams 不会重试输入消息而是停止处理,您需要重新启动客户端。重新启动时,它将 re-read 失败的输入消息 re-try 写入输出主题。
如果想让Kafka Streams重试,需要增加生产者配置reties
,避免生产者内部抛出异常重试写入。如果生产者写入缓冲区已满,这可能 "block" 最终会进一步处理。
如果我有一个 Kafka 流应用程序未能 post 到一个主题(因为该主题不存在),它会提交消费者偏移量并继续,还是会在同一消息上循环直到它可以解决输出主题吗?根据我的观察,该应用程序仅打印错误并运行良好。
尝试 post 主题时的错误示例:
Error while fetching metadata with correlation id 80 : {super.cool.test.topic=UNKNOWN_TOPIC_OR_PARTITION}
在我看来,为了不丢失数据,它只会旋转同一封邮件直到问题得到解决?我找不到关于默认行为是什么的明确答案。我们还没有将自动提交设置为关闭或类似的设置,大部分设置都设置为默认值。
我问是因为我们不想在健康检查正常的情况下结束(应用程序 运行 打印错误记录)而我们只是扔掉大量的 Kafka 消息.
Kafka Streams 不会为这种情况提交偏移量,因为它提供了 at-least-once 处理保证(事实上,甚至不可能以不同的方式重新配置 Kafka Streams——只有更强的 exactly-once 保证是可能的)。此外,Kafka Streams 始终在消费者上禁用 auto-commit(并且不允许您启用它),因为 Kafka Streams 自行管理提交偏移量。
如果您 运行 使用默认设置,生产者实际上应该抛出一个异常并且相应的线程应该死亡——如果线程死亡,您可以通过注册 KafkaStreams#uncaughtExceptionHandler()
获得回调。
您还可以观察 KafkaStreams#state()
(或注册回调 KafkaStreams#setStateListener()
)。如果所有线程都死了,状态将转到 DEAD
(注意,旧版本中存在一个错误,在这种情况下状态仍然是 RUNNING
:https://issues.apache.org/jira/browse/KAFKA-5372)
因此,应用程序不应处于健康状态,Kafka Streams 不会重试输入消息而是停止处理,您需要重新启动客户端。重新启动时,它将 re-read 失败的输入消息 re-try 写入输出主题。
如果想让Kafka Streams重试,需要增加生产者配置reties
,避免生产者内部抛出异常重试写入。如果生产者写入缓冲区已满,这可能 "block" 最终会进一步处理。