Avro GenericRecord 反序列化无法通过 SpringKafka 工作
Avro GenericRecord deserialization not working via SpringKafka
我正在尝试尽可能地简化我的消费者。问题是,当查看我的 Kafka 侦听器中的记录时:
List<GenericRecord> incomingRecords
这些值只是字符串值。我试过将特定的 reader 转换为 true 和 false。我也设置了值解串器。我错过了什么吗?这在我使用 Java 配置 class 时工作正常,但想保持合并到此 application.properties 文件。
application.properties
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${SASL_ACCESS_KEY}" password="${SASL_SECRET}";
spring.kafka.consumer.auto-offset-reset=earliest
#### Consumer Properties Configuration
spring.kafka.properties.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.properties.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
spring.kafka.bootstrap-servers=
spring.kafka.properties.schema.registry.url=
spring.kafka.properties.specific.avro.reader=true
spring.kafka.consumer.properties.spring.json.trusted.packages=*
logging.level.org.apache.kafka=TRACE
logging.level.io.confluent.kafka.schemaregistry=TRACE
消费者
@KafkaListener(topics = "${topic}", groupId = "${group}")
public void processMessageBatch(List<GenericRecord> incomingRecords,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
currentMicroBatch = Stream.of(currentMicroBatch, incomingRecords).flatMap(List::stream).collect(Collectors.toList());
if (currentMicroBatch.size() >= maxRecords || validatedElapsedDuration(durationMonitor)) {
System.out.println("ETL processing logic will be done here");
}
clearBatch();
}
我在使用时注意到:
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
我收到以下错误:
2020-12-02 17:04:42.745 DEBUG 51910 --- [ntainer#0-0-C-1] i.c.k.s.client.rest.RestService : Sending GET with input null to https://myschemaregistry.com
2020-12-02 17:04:42.852 ERROR 51910 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition my-topic-avro-32 at offset 7836. If needed, please seek past the record to continue consumption.
java.lang.IllegalArgumentException: argument "src" is null
at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4735)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3502)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:270)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:573)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:557)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:149)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:241)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:124)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access00(Fetcher.java:1332)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1062)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1018)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:949)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
我发现了问题。深入调试其余客户端以实现融合,我遇到了 401(糟糕的日志 btw)
我需要添加这个:
spring.kafka.properties.basic.auth.credentials.source=SASL_INHERIT
因为我正在使用 SASL 身份验证并且需要注册表来继承我在上面添加的 SASL 配置。有趣的东西..
有同样的问题。
对我来说,我只需要将模式注册表 url 的协议从 http://
设置为 https://
就可以了。
@Ryan 的回答给了我一个线索。
我正在尝试尽可能地简化我的消费者。问题是,当查看我的 Kafka 侦听器中的记录时:
List<GenericRecord> incomingRecords
这些值只是字符串值。我试过将特定的 reader 转换为 true 和 false。我也设置了值解串器。我错过了什么吗?这在我使用 Java 配置 class 时工作正常,但想保持合并到此 application.properties 文件。
application.properties
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${SASL_ACCESS_KEY}" password="${SASL_SECRET}";
spring.kafka.consumer.auto-offset-reset=earliest
#### Consumer Properties Configuration
spring.kafka.properties.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.properties.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
spring.kafka.bootstrap-servers=
spring.kafka.properties.schema.registry.url=
spring.kafka.properties.specific.avro.reader=true
spring.kafka.consumer.properties.spring.json.trusted.packages=*
logging.level.org.apache.kafka=TRACE
logging.level.io.confluent.kafka.schemaregistry=TRACE
消费者
@KafkaListener(topics = "${topic}", groupId = "${group}")
public void processMessageBatch(List<GenericRecord> incomingRecords,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
currentMicroBatch = Stream.of(currentMicroBatch, incomingRecords).flatMap(List::stream).collect(Collectors.toList());
if (currentMicroBatch.size() >= maxRecords || validatedElapsedDuration(durationMonitor)) {
System.out.println("ETL processing logic will be done here");
}
clearBatch();
}
我在使用时注意到:
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
我收到以下错误:
2020-12-02 17:04:42.745 DEBUG 51910 --- [ntainer#0-0-C-1] i.c.k.s.client.rest.RestService : Sending GET with input null to https://myschemaregistry.com
2020-12-02 17:04:42.852 ERROR 51910 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition my-topic-avro-32 at offset 7836. If needed, please seek past the record to continue consumption.
java.lang.IllegalArgumentException: argument "src" is null
at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4735)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3502)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:270)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:573)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:557)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:149)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:241)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:124)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access00(Fetcher.java:1332)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1062)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1018)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:949)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
我发现了问题。深入调试其余客户端以实现融合,我遇到了 401(糟糕的日志 btw)
我需要添加这个:
spring.kafka.properties.basic.auth.credentials.source=SASL_INHERIT
因为我正在使用 SASL 身份验证并且需要注册表来继承我在上面添加的 SASL 配置。有趣的东西..
有同样的问题。
对我来说,我只需要将模式注册表 url 的协议从 http://
设置为 https://
就可以了。
@Ryan 的回答给了我一个线索。