我们如何为 Spring Cloud Stream Kafka 生产者、消费者和 KStreams 中的模式配置 value.subject.name.strategy?
How can we configure value.subject.name.strategy for schemas in Spring Cloud Stream Kafka producers, consumers and KStreams?
我想在 Spring Cloud Stream Producers、Consumers 和 KStreams 中自定义 Avro 模式主题的命名策略。
这将在 Kafka 中使用属性 key.subject.name.strategy
和 value.subject.name.strategy
-> https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy
完成
在本地 Kafka Producer 中,这有效:
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
...
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
但是我在 Spring Cloud Stream 中找不到如何执行此操作。到目前为止,我已经在制作人中尝试过这个:
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
显然 Spring Cloud 使用自己的主题命名策略和接口 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
并且只有一个子类:DefaultSubjectNamingStrategy
.
是否有配置 value.subject.name.strategy
的声明方式,或者我们是否希望提供我们自己的 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
实现和 属性 spring.cloud.stream.schema.avro.subject-naming-strategy
?
您可以在您的属性中将其声明为
spring.cloud.stream.schema.avro.subjectNamingStrategy=MyStrategy
其中 MyStrategy 是接口的实现。例如
object MyStrategy: SubjectNamingStrategy {
override fun toSubject(schema: Schema): String = schema.fullName
}
正如在另一个答案中指出的那样,有一个 dedicated property、spring.cloud.stream.schema.avro.subjectNamingStrategy
,它允许为 Kafka 生产者设置不同的命名策略。
我贡献了提供开箱即用功能的 org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
。
在 Kafka Streams 和原生 serialization/deserialization 的情况下(Spring Cloud Streams 3.0.0+ 的默认行为)你必须使用 Confluent 的实现(io.confluent.kafka.serializers.subject.RecordNameStrategy
) 和原生属性:
spring:
application:
name: shipping-service
cloud:
stream:
...
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
...
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
我想在 Spring Cloud Stream Producers、Consumers 和 KStreams 中自定义 Avro 模式主题的命名策略。
这将在 Kafka 中使用属性 key.subject.name.strategy
和 value.subject.name.strategy
-> https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy
在本地 Kafka Producer 中,这有效:
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
...
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
但是我在 Spring Cloud Stream 中找不到如何执行此操作。到目前为止,我已经在制作人中尝试过这个:
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
显然 Spring Cloud 使用自己的主题命名策略和接口 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
并且只有一个子类:DefaultSubjectNamingStrategy
.
是否有配置 value.subject.name.strategy
的声明方式,或者我们是否希望提供我们自己的 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
实现和 属性 spring.cloud.stream.schema.avro.subject-naming-strategy
?
您可以在您的属性中将其声明为
spring.cloud.stream.schema.avro.subjectNamingStrategy=MyStrategy
其中 MyStrategy 是接口的实现。例如
object MyStrategy: SubjectNamingStrategy {
override fun toSubject(schema: Schema): String = schema.fullName
}
正如在另一个答案中指出的那样,有一个 dedicated property、spring.cloud.stream.schema.avro.subjectNamingStrategy
,它允许为 Kafka 生产者设置不同的命名策略。
我贡献了提供开箱即用功能的 org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
。
在 Kafka Streams 和原生 serialization/deserialization 的情况下(Spring Cloud Streams 3.0.0+ 的默认行为)你必须使用 Confluent 的实现(io.confluent.kafka.serializers.subject.RecordNameStrategy
) 和原生属性:
spring:
application:
name: shipping-service
cloud:
stream:
...
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
...
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy