修改 Kafka 消息内容
Modify Kafka Message Content
我们无法控制发送到 Kafka 的消息的生产者。但是,我们需要将这些消息的内容以约定的特定 JSON 数组结构复制到我们客户的 Kafka 主题。
我们可以在哪个阶段以及如何进行干预以修改数据以匹配所需的结构?
我们可以在接收到的消息在我们的 Kafka 集群上持久化之前修改它们吗?
当我们从我们的 Kafka 集群复制消息到我们客户的 Kafka 时,我们应该修改消息吗?
应该用KSQL来处理吗?
非常感谢您的帮助
我认为关于何时执行数据转换的问题更像是你应该根据你的上下文回答的问题;但以下要点应该可以帮助您做出决定,详细信息是如何做的。
“我们可以在收到的消息在我们的 Kafka 集群上持久化之前修改它们吗?” -- 虽然我知道您无法控制生产者,但您至少可以进行调整设置为 KafkaProducer
实例的属性并注册一个 ProducerInterceptor 可以获取消息有效负载,执行反转换并发送到其他地方。这种方法的缺点是您仍然需要更改生产者代码,并且您将向另一个集群插入标注,这可能会因网络往返而降低吞吐量。我个人不推荐这样做,但这是可能的。
“当我们将消息从我们的 Kafka 集群复制到我们客户的 Kafka 时,我们是否应该修改消息?” -- 是的。我认为这可能是最好的方法,因为在遭受任何突变之前确保数据最终被记录在您的集群中(因此您可以 replay/reprocess 任意多次)。为了实现这一点,您可以使用 Kafka Connect,它提供了一组可插入的工具来执行消息转换——特别是 Single Message Transformations 和 Converters。 @rmoff 写了一篇很好的博客来解释它是如何工作的:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained.
"要不要用KSQL来处理?" -- 是的。我强烈推荐,因为 ksqlDB 充当数据流之上的流处理器。使用 ksqlDB,您可以为可能的转换语义提供强大的支持,这远远超过简单的 per-record/per-aggregation 转换,并使您能够从字面上提出一个新的数据模型,该模型可用于分发下游处理器期望的数据.这是我的推荐。
无论您选择哪个选项,如果您的想法是让数据在另一个不同于您的 Kafka 集群中可用,那么使用 MirrorMaker 或 Confluent Replicator 似乎是一个不错的选择。否则,您可以简单地利用 Kafka Connect 和一些专门的接收器连接器从您的集群读取数据并发送到不需要 Kafka 的目标系统。
我们无法控制发送到 Kafka 的消息的生产者。但是,我们需要将这些消息的内容以约定的特定 JSON 数组结构复制到我们客户的 Kafka 主题。
我们可以在哪个阶段以及如何进行干预以修改数据以匹配所需的结构?
我们可以在接收到的消息在我们的 Kafka 集群上持久化之前修改它们吗?
当我们从我们的 Kafka 集群复制消息到我们客户的 Kafka 时,我们应该修改消息吗?
应该用KSQL来处理吗?
非常感谢您的帮助
我认为关于何时执行数据转换的问题更像是你应该根据你的上下文回答的问题;但以下要点应该可以帮助您做出决定,详细信息是如何做的。
“我们可以在收到的消息在我们的 Kafka 集群上持久化之前修改它们吗?” -- 虽然我知道您无法控制生产者,但您至少可以进行调整设置为 KafkaProducer
实例的属性并注册一个 ProducerInterceptor 可以获取消息有效负载,执行反转换并发送到其他地方。这种方法的缺点是您仍然需要更改生产者代码,并且您将向另一个集群插入标注,这可能会因网络往返而降低吞吐量。我个人不推荐这样做,但这是可能的。
“当我们将消息从我们的 Kafka 集群复制到我们客户的 Kafka 时,我们是否应该修改消息?” -- 是的。我认为这可能是最好的方法,因为在遭受任何突变之前确保数据最终被记录在您的集群中(因此您可以 replay/reprocess 任意多次)。为了实现这一点,您可以使用 Kafka Connect,它提供了一组可插入的工具来执行消息转换——特别是 Single Message Transformations 和 Converters。 @rmoff 写了一篇很好的博客来解释它是如何工作的:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained.
"要不要用KSQL来处理?" -- 是的。我强烈推荐,因为 ksqlDB 充当数据流之上的流处理器。使用 ksqlDB,您可以为可能的转换语义提供强大的支持,这远远超过简单的 per-record/per-aggregation 转换,并使您能够从字面上提出一个新的数据模型,该模型可用于分发下游处理器期望的数据.这是我的推荐。
无论您选择哪个选项,如果您的想法是让数据在另一个不同于您的 Kafka 集群中可用,那么使用 MirrorMaker 或 Confluent Replicator 似乎是一个不错的选择。否则,您可以简单地利用 Kafka Connect 和一些专门的接收器连接器从您的集群读取数据并发送到不需要 Kafka 的目标系统。