消息预处理(主题 - 主题)- Kafka Connect API vs. Streams vs Kafka Consumer?
Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?
我们需要对从一个主题到另一个主题的每条消息(decrypt/re-encrypt 使用不同的键)进行一些预处理。
我一直在研究使用 Kafka Connect,因为它提供了很多开箱即用的好东西(配置管理、偏移量存储、错误处理等)。
但我也觉得我会结束实施 SourceConnector
和 SinkConnector
只是在两个主题之间移动数据,而这些接口都不是为了 Topic A -> (Connector) -> Topic B
。这是正确的方法吗?我应该只使用 SinkConnector
并让我的 SourceTask.put()
执行所有逻辑来写入 Kafka 吗?
其他选项是 KafkaConsumer/Producer
和/或 Streams,但这些将需要自己的实例来 运行 逻辑,而不是偏移重试错误处理。
provides a lot of good things out of the box (config management, offset storage, error handling, etc.)
配置管理不应该比重新部署应用程序更难,但这取决于任何版本控制或您可能有或没有的 CI/CD 管道。
Kafka Producer/Consumer 和 Streams 提供偏移量管理,您只需将其配置为执行默认设置以外的任何操作。
错误处理已得到充分记录,如果您关心检测错误,请不要一劳永逸。 Connect本身会在严重错误情况下停止消费和生产,不会重试或跳过消息。
Neither of those interfaces are meant to do Topic A -> (Connector) -> Topic B
"
你见过Confluent Replicator(正版)吗?那是两个主题之间的Kafka Connect。
不然你见过MirrorMaker吗?这是一个生产者-消费者对,通常用于在各个集群之间复制数据,但可以与相同的源和目标设置一起使用。您只需要确保您没有创建反馈循环。您需要在上面应用 "custom logic"(并更改主题名称),called a Handler class that is placed on your Kafka classpath
bin/kafka-mirror-maker.sh
...
--message.handler <String: A custom Message handler which will process
message handler of type every record in-between consumer and
MirrorMakerMessageHandler> producer.
--message.handler.args <String: Arguments used by custom message
Arguments passed to message handler handler for mirror maker.
constructor.>
Confluent MirrorMaker documentation
Kafka MirrorMaker documentation
没有什么能阻止您实现 Connect API,并且它可能比没有外部集群管理器的 Kafka Streams 应用程序更容易管理。另外,由于 Connect 是一个 Java 库,理论上您可以在其中内部使用 Streams 库。
我们需要对从一个主题到另一个主题的每条消息(decrypt/re-encrypt 使用不同的键)进行一些预处理。
我一直在研究使用 Kafka Connect,因为它提供了很多开箱即用的好东西(配置管理、偏移量存储、错误处理等)。
但我也觉得我会结束实施 SourceConnector
和 SinkConnector
只是在两个主题之间移动数据,而这些接口都不是为了 Topic A -> (Connector) -> Topic B
。这是正确的方法吗?我应该只使用 SinkConnector
并让我的 SourceTask.put()
执行所有逻辑来写入 Kafka 吗?
其他选项是 KafkaConsumer/Producer
和/或 Streams,但这些将需要自己的实例来 运行 逻辑,而不是偏移重试错误处理。
provides a lot of good things out of the box (config management, offset storage, error handling, etc.)
配置管理不应该比重新部署应用程序更难,但这取决于任何版本控制或您可能有或没有的 CI/CD 管道。
Kafka Producer/Consumer 和 Streams 提供偏移量管理,您只需将其配置为执行默认设置以外的任何操作。
错误处理已得到充分记录,如果您关心检测错误,请不要一劳永逸。 Connect本身会在严重错误情况下停止消费和生产,不会重试或跳过消息。
Neither of those interfaces are meant to do
Topic A -> (Connector) -> Topic B
"
你见过Confluent Replicator(正版)吗?那是两个主题之间的Kafka Connect。
不然你见过MirrorMaker吗?这是一个生产者-消费者对,通常用于在各个集群之间复制数据,但可以与相同的源和目标设置一起使用。您只需要确保您没有创建反馈循环。您需要在上面应用 "custom logic"(并更改主题名称),called a Handler class that is placed on your Kafka classpath
bin/kafka-mirror-maker.sh
...
--message.handler <String: A custom Message handler which will process
message handler of type every record in-between consumer and
MirrorMakerMessageHandler> producer.
--message.handler.args <String: Arguments used by custom message
Arguments passed to message handler handler for mirror maker.
constructor.>
Confluent MirrorMaker documentation
Kafka MirrorMaker documentation
没有什么能阻止您实现 Connect API,并且它可能比没有外部集群管理器的 Kafka Streams 应用程序更容易管理。另外,由于 Connect 是一个 Java 库,理论上您可以在其中内部使用 Streams 库。