为什么 Kafka Streams 应用程序 (Spring Cloud Stream) 会忽略自定义 SerDe?
Why is custom SerDe ignored by the Kafka Streams application (Spring Cloud Stream)?
因此,我实现了一个自定义 SerDe,它从 Confluent 提供的 SpecificAvroSerde
扩展而来,以便在与架构注册表通信超时时尝试重试。我已经将 Spring Cloud Streams Kafka 活页夹配置为默认使用它:
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=com.test.RetrySpecificAvroSerde
今天我在日志中看到这个错误:
2020-12-14 01:31:53.006 ERROR 1 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [de7ait1214-x07-baseline-pc-data-s
torage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: stream-thread [de7ait1214-x07-baseline-pc-data-storage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] task [0_0] Exception caught while punctuating processor 'KSTREAM-TRANSFORM-0000000001'
at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:449) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.0.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {...avro json...}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:236) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:365) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:357) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:343) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70) ~[kafka-avro-serializer-5.3.3.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.3.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.3.3.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.3.3.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111) ~[kafka-streams-2.5.0.jar:na]
...
这告诉我 Kafka Streams 使用的 SerDe 不是我上面定义的那个,而是基础 class SpecificAvroSerde
(包装 SpecificAvroSerializer
)。
这是否与某种方式有关 Spring Cloud Stream Kafka 库尝试自动推断要使用的 SerDe?覆盖和设置 SerDe 的“正确”方法是什么?
我在你的配置中看到了这个:spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
。
即默认键 Serde
。您是否打算将其用作 value.serde
。那么,这需要改变。
话虽如此,您也可以在单个绑定上设置 Serde
(具有更高的优先级)。
如果您的 Kafka Streams 函数是强类型的(即 KStream
泛型参数使用正确的类型),您还可以在应用程序中定义类型为 RetrySpecificAvroSerde
的 bean。该方法在binder中优先级最高
改正后,如果还是不行,请分享一个小样给我们看看。
因此,我实现了一个自定义 SerDe,它从 Confluent 提供的 SpecificAvroSerde
扩展而来,以便在与架构注册表通信超时时尝试重试。我已经将 Spring Cloud Streams Kafka 活页夹配置为默认使用它:
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=com.test.RetrySpecificAvroSerde
今天我在日志中看到这个错误:
2020-12-14 01:31:53.006 ERROR 1 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [de7ait1214-x07-baseline-pc-data-s
torage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: stream-thread [de7ait1214-x07-baseline-pc-data-storage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] task [0_0] Exception caught while punctuating processor 'KSTREAM-TRANSFORM-0000000001'
at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:449) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.0.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {...avro json...}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:236) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:365) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:357) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:343) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70) ~[kafka-avro-serializer-5.3.3.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.3.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.3.3.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.3.3.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111) ~[kafka-streams-2.5.0.jar:na]
...
这告诉我 Kafka Streams 使用的 SerDe 不是我上面定义的那个,而是基础 class SpecificAvroSerde
(包装 SpecificAvroSerializer
)。
这是否与某种方式有关 Spring Cloud Stream Kafka 库尝试自动推断要使用的 SerDe?覆盖和设置 SerDe 的“正确”方法是什么?
我在你的配置中看到了这个:spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
。
即默认键 Serde
。您是否打算将其用作 value.serde
。那么,这需要改变。
话虽如此,您也可以在单个绑定上设置 Serde
(具有更高的优先级)。
如果您的 Kafka Streams 函数是强类型的(即 KStream
泛型参数使用正确的类型),您还可以在应用程序中定义类型为 RetrySpecificAvroSerde
的 bean。该方法在binder中优先级最高
改正后,如果还是不行,请分享一个小样给我们看看。