我们如何为 Spring Cloud Stream Kafka 生产者、消费者和 KStreams 中的模式配置 value.subject.name.strategy?

How can we configure value.subject.name.strategy for schemas in Spring Cloud Stream Kafka producers, consumers and KStreams?

我想在 Spring Cloud Stream Producers、Consumers 和 KStreams 中自定义 Avro 模式主题的命名策略。

这将在 Kafka 中使用属性 key.subject.name.strategyvalue.subject.name.strategy -> https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy

完成

在本地 Kafka Producer 中,这有效:


private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        ...
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }

    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }

但是我在 Spring Cloud Stream 中找不到如何执行此操作。到目前为止,我已经在制作人中尝试过这个:

spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
                value:
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

显然 Spring Cloud 使用自己的主题命名策略和接口 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy 并且只有一个子类:DefaultSubjectNamingStrategy.

是否有配置 value.subject.name.strategy 的声明方式,或者我们是否希望提供我们自己的 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy 实现和 属性 spring.cloud.stream.schema.avro.subject-naming-strategy

您可以在您的属性中将其声明为

spring.cloud.stream.schema.avro.subjectNamingStrategy=MyStrategy

其中 MyStrategy 是接口的实现。例如

object MyStrategy: SubjectNamingStrategy {
   override fun toSubject(schema: Schema): String = schema.fullName
}

正如在另一个答案中指出的那样,有一个 dedicated propertyspring.cloud.stream.schema.avro.subjectNamingStrategy,它允许为 Kafka 生产者设置不同的命名策略

我贡献了提供开箱即用功能的 org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy

Kafka Streams 和原生 serialization/deserialization 的情况下(Spring Cloud Streams 3.0.0+ 的默认行为)你必须使用 Confluent 的实现(io.confluent.kafka.serializers.subject.RecordNameStrategy) 和原生属性:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      ...
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              ...
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy