如何默认从Kafka Spring Cloud Stream消费,同时消费一条confluent API生成的Kafka消息?
How to consume from Kafka Spring Cloud Stream by default and also consume a Kafka message generated by the confluent API?
我正在构建一个默认使用 Spring 其他 (SCS) 组件生成的 Cloud Stream (SCS) Kafka 消息的微服务组件。
但我还需要使用来自其他使用融合 API 的组件的 Kafka 消息。
我有一个示例存储库,显示了我正在尝试做的事情。
https://github.com/donalthurley/KafkaConsumeScsAndConfluent
这是下面带有 SCS 输入绑定和合流输入绑定的应用程序配置。
spring:
application:
name: kafka
kafka:
consumer:
properties.schema.registry.url: http://192.168.99.100:8081
cloud:
stream:
kafka:
binder:
brokers: PLAINTEXT://192.168.99.100:9092
# configuration:
# specific:
# avro:
# reader: true
# key:
# deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value:
# deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
bindings:
inputConfluent:
contentType: application/*+avro
destination: confluent-destination
group: input-confluent-group
inputScs:
contentType: application/*+avro
destination: scs-destination
group: input-scs-group
通过上述配置,我得到了使用 SCS 默认配置创建的两个消费者
例如 class org.apache.kafka.common.serialization.ByteArrayDeserializer 是两个输入绑定的值反序列化器。
如果我删除上述配置中的注释,我会同时收到从我的 Confluent 客户端发送的配置的消费者
例如 class io.confluent.kafka.serializers.KafkaAvroDeserializer 是两个输入绑定的值反序列化器。
我明白,因为配置在 Kafka 绑定器上,它将应用于使用该绑定器定义的所有消费者。
有什么方法可以定义那些特定的属性,以便它们仅适用于融合的特定消费者绑定,而所有其他输入绑定都可以使用默认的 SCS 配置?
您可以通过 configuration
属性.
设置绑定特定的消费者和生产者属性
spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration.foo.bar=baz
使用非标准 serializers/deserializers 时,您必须分别为生产者和消费者设置 useNativeEncoding
和 useNativeDecoding
。再次,请参阅参考手册。
我正在构建一个默认使用 Spring 其他 (SCS) 组件生成的 Cloud Stream (SCS) Kafka 消息的微服务组件。
但我还需要使用来自其他使用融合 API 的组件的 Kafka 消息。
我有一个示例存储库,显示了我正在尝试做的事情。
https://github.com/donalthurley/KafkaConsumeScsAndConfluent
这是下面带有 SCS 输入绑定和合流输入绑定的应用程序配置。
spring:
application:
name: kafka
kafka:
consumer:
properties.schema.registry.url: http://192.168.99.100:8081
cloud:
stream:
kafka:
binder:
brokers: PLAINTEXT://192.168.99.100:9092
# configuration:
# specific:
# avro:
# reader: true
# key:
# deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value:
# deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
bindings:
inputConfluent:
contentType: application/*+avro
destination: confluent-destination
group: input-confluent-group
inputScs:
contentType: application/*+avro
destination: scs-destination
group: input-scs-group
通过上述配置,我得到了使用 SCS 默认配置创建的两个消费者 例如 class org.apache.kafka.common.serialization.ByteArrayDeserializer 是两个输入绑定的值反序列化器。
如果我删除上述配置中的注释,我会同时收到从我的 Confluent 客户端发送的配置的消费者 例如 class io.confluent.kafka.serializers.KafkaAvroDeserializer 是两个输入绑定的值反序列化器。
我明白,因为配置在 Kafka 绑定器上,它将应用于使用该绑定器定义的所有消费者。
有什么方法可以定义那些特定的属性,以便它们仅适用于融合的特定消费者绑定,而所有其他输入绑定都可以使用默认的 SCS 配置?
您可以通过 configuration
属性.
spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration.foo.bar=baz
使用非标准 serializers/deserializers 时,您必须分别为生产者和消费者设置 useNativeEncoding
和 useNativeDecoding
。再次,请参阅参考手册。