使用连接接收器 HDFS 连接器包含来自 Kafka 消息的密钥
Include the key from a Kafka message with a connect sink HDFS connector
我正在使用 Kafka 连接 HDFS 接收器连接器从 kafka 写入 HDFS,它工作正常。我的消息如下所示:
key: my-key
value: {
"name": "helen"
}
我的用例是需要将消息的密钥附加到我发送到 HDFS 的事件中。
问题是密钥没有出现在值负载中,所以我不能使用:
"partitioner.class":
"io.confluent.connect.hdfs.partitioner.FieldPartitioner",
"partition.field.name": "key",
我的问题是如何将密钥添加到发送到 HDFS 的消息中,或者如何根据密钥进行分区?
开箱即用,你不能(S3 Connect 也是如此),只是基于代码编写的方式,而不是 Connect 框架的限制
编辑 - 至少对于 S3 接收器,我认为现在有一个 属性 来包含键
至少,您需要构建此 SMT 并将其添加到您的 Connect worker,这将在写入之前将键、主题和分区全部“移动”到 Connect 记录的“值”中存储
https://github.com/jcustenborder/kafka-connect-transform-archive
我正在使用 Kafka 连接 HDFS 接收器连接器从 kafka 写入 HDFS,它工作正常。我的消息如下所示:
key: my-key
value: {
"name": "helen"
}
我的用例是需要将消息的密钥附加到我发送到 HDFS 的事件中。
问题是密钥没有出现在值负载中,所以我不能使用:
"partitioner.class":
"io.confluent.connect.hdfs.partitioner.FieldPartitioner",
"partition.field.name": "key",
我的问题是如何将密钥添加到发送到 HDFS 的消息中,或者如何根据密钥进行分区?
开箱即用,你不能(S3 Connect 也是如此),只是基于代码编写的方式,而不是 Connect 框架的限制
编辑 - 至少对于 S3 接收器,我认为现在有一个 属性 来包含键
至少,您需要构建此 SMT 并将其添加到您的 Connect worker,这将在写入之前将键、主题和分区全部“移动”到 Connect 记录的“值”中存储
https://github.com/jcustenborder/kafka-connect-transform-archive