Kafka Connect 不支持主题策略
Kafka Connect not working with Subject Strategies
上下文
我编码了几个小 Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro。
我使用 fast-data-dev Docker image provided by Landoop
将它们部署到本地 Kafka 环境中
基本设置有效并每秒生成一条消息记录
不过,我想换subject name strategy。默认一个生成两个主题:
${topic}-key
${topic}-value
根据我的用例,我需要生成具有不同模式的事件,这些事件将以同一主题结束。因此,我需要的主题名称是:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
根据the docs, my needs fits into the TopicRecordNameStrategy
我尝试了什么
我创建了 avroData
对象来发送连接值:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
然后使用它来创建 SourceRecord
响应对象
The documentation 指出,为了在 Kafka Connect 中使用 Schema Registry,我必须在连接器配置中设置一些属性。因此,当我创建它时,我添加了它们:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
问题
连接器似乎忽略了这些属性并继续使用旧的 ${topic}-key
和 ${topic}-value
主题。
问题
Kafka Connect 应该支持不同的主题策略。我设法通过编写自己的 AvroConverter
版本并硬编码主题策略是我需要的策略来解决这个问题。但是,这看起来不是一个好方法,并且在尝试使用 Sink Kafka 连接器使用数据时也带来了问题。我复制了这个主题,所以有一个旧名称的版本 (${topic}-key
) 并且它有效
将主题策略指定为 Kafka Connect 的正确设置是什么?
您缺少 key.converter
和 value.converter
前缀,以便将配置传递给转换器。所以而不是:
key.subject.name.strategy
value.subject.name.strategy
你想要:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
来源https://docs.confluent.io/current/connect/managing/configuring.html:
To pass configuration parameters to key and value converters, prefix them with key.converter.
or value.converter.
as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter
or value.converter
properties.
上下文
我编码了几个小 Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro。
我使用 fast-data-dev Docker image provided by Landoop
将它们部署到本地 Kafka 环境中基本设置有效并每秒生成一条消息记录
不过,我想换subject name strategy。默认一个生成两个主题:
${topic}-key
${topic}-value
根据我的用例,我需要生成具有不同模式的事件,这些事件将以同一主题结束。因此,我需要的主题名称是:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
根据the docs, my needs fits into the TopicRecordNameStrategy
我尝试了什么
我创建了 avroData
对象来发送连接值:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
然后使用它来创建 SourceRecord
响应对象
The documentation 指出,为了在 Kafka Connect 中使用 Schema Registry,我必须在连接器配置中设置一些属性。因此,当我创建它时,我添加了它们:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
问题
连接器似乎忽略了这些属性并继续使用旧的 ${topic}-key
和 ${topic}-value
主题。
问题
Kafka Connect 应该支持不同的主题策略。我设法通过编写自己的 AvroConverter
版本并硬编码主题策略是我需要的策略来解决这个问题。但是,这看起来不是一个好方法,并且在尝试使用 Sink Kafka 连接器使用数据时也带来了问题。我复制了这个主题,所以有一个旧名称的版本 (${topic}-key
) 并且它有效
将主题策略指定为 Kafka Connect 的正确设置是什么?
您缺少 key.converter
和 value.converter
前缀,以便将配置传递给转换器。所以而不是:
key.subject.name.strategy
value.subject.name.strategy
你想要:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
来源https://docs.confluent.io/current/connect/managing/configuring.html:
To pass configuration parameters to key and value converters, prefix them with
key.converter.
orvalue.converter.
as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in thekey.converter
orvalue.converter
properties.