消息预处理(主题 - 主题)- Kafka Connect API vs. Streams vs Kafka Consumer?

Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

我们需要对从一个主题到另一个主题的每条消息(decrypt/re-encrypt 使用不同的键)进行一些预处理。

我一直在研究使用 Kafka Connect,因为它提供了很多开箱即用的好东西(配置管理、偏移量存储、错误处理等)。

但我也觉得我会结束实施 SourceConnectorSinkConnector 只是在两个主题之间移动数据,而这些接口都不是为了 Topic A -> (Connector) -> Topic B。这是正确的方法吗?我应该只使用 SinkConnector 并让我的 SourceTask.put() 执行所有逻辑来写入 Kafka 吗?

其他选项是 KafkaConsumer/Producer 和/或 Streams,但这些将需要自己的实例来 运行 逻辑,而不是偏移重试错误处理。

provides a lot of good things out of the box (config management, offset storage, error handling, etc.)

配置管理不应该比重新部署应用程序更难,但这取决于任何版本控制或您可能有或没有的 CI/CD 管道。

Kafka Producer/Consumer 和 Streams 提供偏移量管理,您只需将其配置为执行默认设置以外的任何操作。

错误处理已得到充分记录,如果您关心检测错误,请不要一劳永逸。 Connect本身会在严重错误情况下停止消费和生产,不会重试或跳过消息。

Neither of those interfaces are meant to do Topic A -> (Connector) -> Topic B"

你见过Confluent Replicator(正版)吗?那两个主题之间的Kafka Connect。

不然你见过MirrorMaker吗?这是一个生产者-消费者对,通常用于在各个集群之间复制数据,但可以与相同的源和目标设置一起使用。您只需要确保您没有创建反馈循环。您需要在上面应用 "custom logic"(并更改主题名称),called a Handler class that is placed on your Kafka classpath

bin/kafka-mirror-maker.sh

...

--message.handler <String: A custom      Message handler which will process
  message handler of type                  every record in-between consumer and
  MirrorMakerMessageHandler>               producer.
--message.handler.args <String:          Arguments used by custom message
  Arguments passed to message handler      handler for mirror maker.
  constructor.>

Confluent MirrorMaker documentation
Kafka MirrorMaker documentation


没有什么能阻止您实现 Connect API,并且它可能比没有外部集群管理器的 Kafka Streams 应用程序更容易管理。另外,由于 Connect 是一个 Java 库,理论上您可以在其中内部使用 Streams 库。