我在哪里编写 kafka connect sink 自定义分区程序的代码?
Where do I write the code for kafka connect sink custom partitioner?
这可能是一个非常简单的问题,所以我会提前道歉。
我正在为 kafka 主题添加一个 s3 接收器连接器,这里是 conf 文件:
{
"schemas.enable": "false",
"name": "my-s3-sink",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": [
"my-topic-name"
],
"errors.deadletterqueue.context.headers.enable": "true",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "2000",
"rotate.schedule.interval.ms": "600000",
"s3.bucket.name": "my-bucket-name",
"s3.object.tagging": "true",
"s3.region": "region",
"s3.part.size": "5242880",
"aws.access.key.id": "****",
"aws.secret.access.key": "****",
"s3.ssea.name": "AES256",
"s3.compression.type": "gzip",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"topics.dir": "",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"locale": "en-GB",
"timezone": "UTC"
}
这会以 {topic_name}/YYYY/MM/DD/HH/{message}
格式输出消息,我希望密钥只是 YYYY/MM/DD/HH/{message}
。经过一些研究,我发现为了从键中删除主题名称,我必须编写一个自定义分区程序来扩展和覆盖 TimeBasedPartitioner
的部分内容。 (这里有一个例子https://github.com/confluentinc/kafka-connect-storage-cloud/issues/321)
我的问题是我现在不知道该在哪里编写该分区程序的实际代码,它应该放在哪里?基于时间的分区程序似乎 link 到 confluent 拥有的某种注册表,但是自定义分区程序会去哪里,我将如何在连接器的 conf 文件中引用该代码?
您在单独的项目中编写代码,将其编译为 JAR,然后将其放置在每个连接工作者的类路径中。
然后可以从partitioner.class
参考
这可能是一个非常简单的问题,所以我会提前道歉。 我正在为 kafka 主题添加一个 s3 接收器连接器,这里是 conf 文件:
{
"schemas.enable": "false",
"name": "my-s3-sink",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": [
"my-topic-name"
],
"errors.deadletterqueue.context.headers.enable": "true",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "2000",
"rotate.schedule.interval.ms": "600000",
"s3.bucket.name": "my-bucket-name",
"s3.object.tagging": "true",
"s3.region": "region",
"s3.part.size": "5242880",
"aws.access.key.id": "****",
"aws.secret.access.key": "****",
"s3.ssea.name": "AES256",
"s3.compression.type": "gzip",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"topics.dir": "",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"locale": "en-GB",
"timezone": "UTC"
}
这会以 {topic_name}/YYYY/MM/DD/HH/{message}
格式输出消息,我希望密钥只是 YYYY/MM/DD/HH/{message}
。经过一些研究,我发现为了从键中删除主题名称,我必须编写一个自定义分区程序来扩展和覆盖 TimeBasedPartitioner
的部分内容。 (这里有一个例子https://github.com/confluentinc/kafka-connect-storage-cloud/issues/321)
我的问题是我现在不知道该在哪里编写该分区程序的实际代码,它应该放在哪里?基于时间的分区程序似乎 link 到 confluent 拥有的某种注册表,但是自定义分区程序会去哪里,我将如何在连接器的 conf 文件中引用该代码?
您在单独的项目中编写代码,将其编译为 JAR,然后将其放置在每个连接工作者的类路径中。
然后可以从partitioner.class