Kafka 连接消费者引用偏移量并存储在消息中
Kafka connect consumer referencing offset and storing in message
如果我使用 kafka-connect 来使用消息并存储到 s3(使用 kafka-connect s3 连接器),我是否可以将消息偏移量与事件负载一起存储?我想要这些数据来对消息进行排序,并检查是否存在任何差距或检查我收到的消息中是否有重复项。 (例如,如果我的消费者抵消被意外破坏并且我重新启动了 kafka-connect)。这是可能的还是我应该为这种类型的功能编写自定义订阅者?
根据有关 Insert Field 转换的文档,您可以使用 offset.field
:
Name Description
offset.field Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).
总的来说,您的单一消息转换 (SMT) 配置如下所示:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "offsetColumn"
如果这不是您想要的,那么总有一个选项可以创建您的 customised 转换
如果我使用 kafka-connect 来使用消息并存储到 s3(使用 kafka-connect s3 连接器),我是否可以将消息偏移量与事件负载一起存储?我想要这些数据来对消息进行排序,并检查是否存在任何差距或检查我收到的消息中是否有重复项。 (例如,如果我的消费者抵消被意外破坏并且我重新启动了 kafka-connect)。这是可能的还是我应该为这种类型的功能编写自定义订阅者?
根据有关 Insert Field 转换的文档,您可以使用 offset.field
:
Name Description
offset.field Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).
总的来说,您的单一消息转换 (SMT) 配置如下所示:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "offsetColumn"
如果这不是您想要的,那么总有一个选项可以创建您的 customised 转换