具有多个 avro 注册表 url 的 KafkaAvroSerializer

KafkaAvroSerializer with multiple avro registry urls

我们有一个 KafkaAvroSerde 配置了多个 avroregistry url。在某个时候,serde 在尝试在 1 url 上注册模式时超时,但由于它向流应用程序抛出 IO 异常,因此流线程关闭。从 kafka 流应用程序的角度来看,这违背了在创建 avro serdes 时能够支持多个 url 的目的,因为冒泡 DSL api 堆栈的运行时异常将关闭 Stream线。 几个问题:

  1. 请问有什么好办法吗?
  2. 我们是否需要在应用程序逻辑中强制重试(当您只是将主题具体化到商店中时,这可能会很棘手)?
  3. 否则是否有一个 avroserde 包装器
    可以使用实际配置重试 avroRegistry urls 吗?
  4. 当具体化到本地 rocksDB 商店时,是否添加了
    在注册表中注册架构的值,还是我们应该将 auto.register.schemas 配置为 false?

>

Exception in thread "mediafirst-npvr-adapter-program-mapping-mtrl02nsbe02.pf.spop.ca-f5e097bd-ff1b-42da-9f7d-2ab9fa5d2b70-GlobalStreamThread" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"ProgramMapp
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002; error code: 50002
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:299)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.innerValue(MeteredKeyValueBytesStore.java:68)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.innerValue(MeteredKeyValueBytesStore.java:57)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121)
at com.bell.cts.commons.kafka.store.custom.CustomStoreProcessor.process(CustomStoreProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl.forward(GlobalProcessorContextImpl.java:52)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:87)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:282)

From a kafka stream app perspective, this kinds of defies the purpose of having the ability to support multiple urls when creating the avro serdes, since the runtime exception bubbling up the DSL api stack will close the Stream Thread.

这里我不同意:从 Kafka Streams 的角度来看,序列化失败,因此应用程序确实需要关闭。请注意,Kafka Streams 对您正在使用的 Serdes 是不可知的,因此,不知道您的 Serde 正在与架构注册表对话并且它可以重试。

因此,Serde 负责在内部处理重试。我不知道有包装器可以做到这一点,但自己构建应该不会太难。我将创建一个内部票证来跟踪此功能请求。我认为为开箱即用的体验添加这个很有意义。

对于 RocksDB:写入 RocksDB 的所有记录也写入 changelog 主题。因此,要允许 Kafka Streams 读取此数据以在错误后恢复状态,您需要注册模式。