KTable 外键连接与 avro 模式注册表不兼容
KTable foreign key join not compatible with avro schema registry
使用融合 5.4.1
我们在将加入的流从加入的 KTable 转发到另一个主题时,碰巧在新的 KTable 外键加入中遇到了问题。
Schema registry 和 Avro serialiser 相关工件:融合版本 5.4.1 kafkastreams 客户端:5.4.1-ccs
外键跨 Ktables 连接后,在使用“for each”从 ktable 中使用流时,它工作正常,但如果使用“to”将连接流转发到另一个主题,则存在模式注册错误抛出。
将错误中提到的模式与在模式注册表中注册的模式进行比较,它完全相同......似乎 Kafka 2.4.0 已经有类似的问题:https://issues.apache.org/jira/browse/KAFKA-9390 和问题在将其转发到另一个主题时继续存在
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:64)
at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:236)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static[=10=](ResourceMethodInvocationHandlerFactory.java:52)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.run(AbstractJavaResourceMethodDispatcher.java:124)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
at org.glassfish.jersey.server.ServerRuntime.run(ServerRuntime.java:253)
at org.glassfish.jersey.internal.Errors.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)
at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:385)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:560)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:501)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:438)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1591)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1581)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1307)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:482)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1549)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1204)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:494)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint.run(ChannelEndPoint.java:117)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema.
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:432)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:481)
at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:222)
... 57 more
; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:331) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:431) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:423) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:409) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71) ~[kafka-avro-serializer-5.4.1.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.4.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.4.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.4.1.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:166) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:106) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:124) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier.process(SubscriptionResolverJoinProcessorSupplier.java:107) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier.process(SubscriptionResolverJoinProcessorSupplier.java:60) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:432) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536) [kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792) [kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-streams-5.4.1-ccs.jar:na]
不好意思,这个问题是因为之前通过另一个编译模块为同一主题注册的架构存在差异...错误不够明显,无法提及 "to" 中的主题,相反,它提到了一个内部主题,这导致了混乱。这不再是问题。
使用融合 5.4.1
我们在将加入的流从加入的 KTable 转发到另一个主题时,碰巧在新的 KTable 外键加入中遇到了问题。
Schema registry 和 Avro serialiser 相关工件:融合版本 5.4.1 kafkastreams 客户端:5.4.1-ccs
外键跨 Ktables 连接后,在使用“for each”从 ktable 中使用流时,它工作正常,但如果使用“to”将连接流转发到另一个主题,则存在模式注册错误抛出。
将错误中提到的模式与在模式注册表中注册的模式进行比较,它完全相同......似乎 Kafka 2.4.0 已经有类似的问题:https://issues.apache.org/jira/browse/KAFKA-9390 和问题在将其转发到另一个主题时继续存在
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:64)
at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:236)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static[=10=](ResourceMethodInvocationHandlerFactory.java:52)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.run(AbstractJavaResourceMethodDispatcher.java:124)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
at org.glassfish.jersey.server.ServerRuntime.run(ServerRuntime.java:253)
at org.glassfish.jersey.internal.Errors.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)
at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:385)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:560)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:501)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:438)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1591)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1581)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1307)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:482)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1549)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1204)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:494)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint.run(ChannelEndPoint.java:117)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema.
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:432)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:481)
at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:222)
... 57 more
; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:331) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:431) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:423) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:409) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172) ~[kafka-schema-registry-client-5.4.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71) ~[kafka-avro-serializer-5.4.1.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.4.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.4.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.4.1.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:166) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:106) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:124) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier.process(SubscriptionResolverJoinProcessorSupplier.java:107) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier.process(SubscriptionResolverJoinProcessorSupplier.java:60) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:432) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474) ~[kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536) [kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792) [kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [kafka-streams-5.4.1-ccs.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-streams-5.4.1-ccs.jar:na]
不好意思,这个问题是因为之前通过另一个编译模块为同一主题注册的架构存在差异...错误不够明显,无法提及 "to" 中的主题,相反,它提到了一个内部主题,这导致了混乱。这不再是问题。