Kafka Streams - 重试消息
Kafka Streams - Retrying a message
我有一个具有外部依赖性的 Kafka 流应用程序。如果依赖项不可用,我想稍后重新处理消息。我无法控制偏移量,因为流是在内部进行的。最好的方法是什么?
您需要将消息放入存储区,稍后将其从存储区中取出并重试。
您可以在常规处理期间或通过安排标点符号来重试。
据我了解 Kafka Streams 和整个 Confluent Platform 架构,您不应该直接从 Kafka Streams 应用程序与任何外部资源通信。基本概念之一是 Kafka Steams 应用程序输入和输出只是 Kafka 主题。如果需要,与所有其他外部资源的通信应由 Kafka Connect. There is a lot different connectors made by Confluent and community, you can even write your own implementation 完成。
在这种方法中,您不需要自己实施重试。另一件事是 Kafka Streams 中的消息处理不会被任何长期存在的 IO 操作阻塞,这可能会对流拓扑的其他组件产生负面影响。所有阻塞操作和重试都将在专为此类操作设计的 Kafka Connect 连接器中完成。连接器应该是容错的并保证交付。
这是来自 Confluent blog which shows described approach
的简单图表
我有一个具有外部依赖性的 Kafka 流应用程序。如果依赖项不可用,我想稍后重新处理消息。我无法控制偏移量,因为流是在内部进行的。最好的方法是什么?
您需要将消息放入存储区,稍后将其从存储区中取出并重试。
您可以在常规处理期间或通过安排标点符号来重试。
据我了解 Kafka Streams 和整个 Confluent Platform 架构,您不应该直接从 Kafka Streams 应用程序与任何外部资源通信。基本概念之一是 Kafka Steams 应用程序输入和输出只是 Kafka 主题。如果需要,与所有其他外部资源的通信应由 Kafka Connect. There is a lot different connectors made by Confluent and community, you can even write your own implementation 完成。
在这种方法中,您不需要自己实施重试。另一件事是 Kafka Streams 中的消息处理不会被任何长期存在的 IO 操作阻塞,这可能会对流拓扑的其他组件产生负面影响。所有阻塞操作和重试都将在专为此类操作设计的 Kafka Connect 连接器中完成。连接器应该是容错的并保证交付。
这是来自 Confluent blog which shows described approach