Spring Cloud Stream 应用程序中的 timestampExtractorBeanName 设置不会覆盖默认值
timestampExtractorBeanName setting in the Spring Cloud Stream application doesn't override default value
我使用 Kafka Streams Binder 的 Spring Cloud Stream 应用程序具有以下属性:
spring.cloud.stream.bindings:
windowStream-in-0:
destination: input
windowStream-out-0:
destination: window1
hint1Stream-in-0:
destination: window1
hint1Stream-out-0:
destination: hints
realityStream-in-0:
destination: input
realityStream-in-1:
destination: window1
consumer:
timestampExtractorBeanName: anotherTimestampExtractor
realityStream-out-0:
destination: hints
countStream-in-0:
destination: hints
spring.cloud.stream.kafka.streams:
default:
consumer:
timestampExtractorBeanName: timestampExtractor
binder:
functions:
windowStream:
applicationId: mock-stream-window1
hint1Stream:
applicationId: mock-stream-hints
realityStream:
applicationId: mock-stream-reality
countStream:
applicationId: mock-stream-count
stateStoreRetry:
maxAttempts: 3
backOffInterval: 1000
configuration:
schema.registry.url: mock://mock-stream-registry
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
commit.interval.ms: 100
我想要做的是对所有流使用 "timestampExtractor",除了一个名为 "realityStream" 的流。
为此,我将 spring.cloud.stream.kafka.streams.default.consumer.timestampExtractorBeanName
设置为 timestampExtractor
,然后通过设置 spring.cloud.stream.bindings.realityStream-in-1.consumer.timestampExtractorBeanName
尝试 "override" "realityStream"
不幸的是,我的重写似乎不起作用,因为只有 "timestampExtractor" 被调用,正如我在调试器(以及我的测试结果)中看到的那样。
是我应用的配置有误,还是我的期望有误?
这是我的单个 Spring Cloud Streams 应用程序中的流图片:
(橙色圆圈是我要应用非默认时间戳提取器的地方)
覆盖在错误的地方;它需要在 ....kafka.streams.bindings.realityStream-in-1....
属性.
下
这是kafka特有的属性;你在通用绑定属性中有它(所有绑定器共有)。
我使用 Kafka Streams Binder 的 Spring Cloud Stream 应用程序具有以下属性:
spring.cloud.stream.bindings:
windowStream-in-0:
destination: input
windowStream-out-0:
destination: window1
hint1Stream-in-0:
destination: window1
hint1Stream-out-0:
destination: hints
realityStream-in-0:
destination: input
realityStream-in-1:
destination: window1
consumer:
timestampExtractorBeanName: anotherTimestampExtractor
realityStream-out-0:
destination: hints
countStream-in-0:
destination: hints
spring.cloud.stream.kafka.streams:
default:
consumer:
timestampExtractorBeanName: timestampExtractor
binder:
functions:
windowStream:
applicationId: mock-stream-window1
hint1Stream:
applicationId: mock-stream-hints
realityStream:
applicationId: mock-stream-reality
countStream:
applicationId: mock-stream-count
stateStoreRetry:
maxAttempts: 3
backOffInterval: 1000
configuration:
schema.registry.url: mock://mock-stream-registry
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
commit.interval.ms: 100
我想要做的是对所有流使用 "timestampExtractor",除了一个名为 "realityStream" 的流。
为此,我将 spring.cloud.stream.kafka.streams.default.consumer.timestampExtractorBeanName
设置为 timestampExtractor
,然后通过设置 spring.cloud.stream.bindings.realityStream-in-1.consumer.timestampExtractorBeanName
不幸的是,我的重写似乎不起作用,因为只有 "timestampExtractor" 被调用,正如我在调试器(以及我的测试结果)中看到的那样。
是我应用的配置有误,还是我的期望有误?
这是我的单个 Spring Cloud Streams 应用程序中的流图片:
(橙色圆圈是我要应用非默认时间戳提取器的地方)
覆盖在错误的地方;它需要在 ....kafka.streams.bindings.realityStream-in-1....
属性.
这是kafka特有的属性;你在通用绑定属性中有它(所有绑定器共有)。