Kafka Connect 不支持主题策略

Kafka Connect not working with Subject Strategies

上下文

我编码了几个小 Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro

我使用 fast-data-dev Docker image provided by Landoop

将它们部署到本地 Kafka 环境中

基本设置有效并每秒生成一条消息记录

不过,我想换subject name strategy。默认一个生成两个主题:

根据我的用例,我需要生成具有不同模式的事件,这些事件将以同一主题结束。因此,我需要的主题名称是:

根据the docs, my needs fits into the TopicRecordNameStrategy

我尝试了什么

我创建了 avroData 对象来发送连接值:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

然后使用它来创建 SourceRecord 响应对象

The documentation 指出,为了在 Kafka Connect 中使用 Schema Registry,我必须在连接器配置中设置一些属性。因此,当我创建它时,我添加了它们:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

问题

连接器似乎忽略了这些属性并继续使用旧的 ${topic}-key${topic}-value 主题。

问题

Kafka Connect 应该支持不同的主题策略。我设法通过编写自己的 AvroConverter 版本并硬编码主题策略是我需要的策略来解决这个问题。但是,这看起来不是一个好方法,并且在尝试使用 Sink Kafka 连接器使用数据时也带来了问题。我复制了这个主题,所以有一个旧名称的版本 (${topic}-key) 并且它有效

将主题策略指定为 Kafka Connect 的正确设置是什么?

您缺少 key.convertervalue.converter 前缀,以便将配置传递给转换器。所以而不是:

key.subject.name.strategy
value.subject.name.strategy

你想要:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

来源https://docs.confluent.io/current/connect/managing/configuring.html

To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.