使用数据生成的 ksql-datagen 实用程序反序列化异常
Deserializing exception with data generated ksql-datagen utility
从以下模式的 ksql-datagen 实用程序生成示例流 -
{
"type": "record",
"name": "users",
**"namespace": "com.example",**
"fields": [
{
"name": "registertime",
"type": {
"type":"long",
"arg.properties":{
"range":{"min":1487715775521,"max":1519273364600}
}
}
},
{
"name": "userid",
"type": {
"type":"string",
"arg.properties":{"regex":"User_[1-9][0-2]"}
}
},
{
"name": "regionid",
"type": {
"type":"string",
"arg.properties":{"regex":"Region_[1-9]"}
}
},
{
"name": "gender",
"type": {
"type":"string",
"arg.properties":{
"options":["MALE","FEMALE","OTHER"]
}
}
}
]}
在检查版本时,它仍然选择 "io.confluent.ksql.avro_schemas" 架构 -
curl "http://localhost:8081/subjects/test-user-value/versions/1"
{"subject":"test-user-value","version":1,"id":4,"schema":"{"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"registertime","type":["null","long"],"default":null},{"name":"userid", "type":["null","string"],"default":null},{"name":"regionid","type":["null","string"],"default":null},{"name":"gender","type":["null","string"],"default":null}]}"}
尝试使用 Kafka-streams 时出现以下错误 API -
Exception in thread
"PageView-Users-Stream-Join-eg-1dc610a3-c9d9-4c1e-b5eb-910e4bc74826-StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: Deserialization
exception handler is set to fail upon a deserialization error. If you
would rather have the streaming pipeline continue after a
deserialization error, please set the
default.deserialization.exception.handler appropriately. at
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at
org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124)
at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:711)
at
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:833)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException:
Error deserializing Avro message for id 4 Caused by:
org.apache.kafka.common.errors.SerializationException: Could not find
class io.confluent.ksql.avro_schemas.KsqlDataSourceSchema specified in writer's schema whilst finding reader's schema for a
SpecificRecord.
回答于 https://github.com/confluentinc/schema-registry/issues/980
Datagen always defines the namespace as io.confluent.ksql.avro_schemas
. See confluentinc/ksql#1906
现在 other ways 也可以将测试数据生成到 Kafka 中。
从以下模式的 ksql-datagen 实用程序生成示例流 -
{
"type": "record",
"name": "users",
**"namespace": "com.example",**
"fields": [
{
"name": "registertime",
"type": {
"type":"long",
"arg.properties":{
"range":{"min":1487715775521,"max":1519273364600}
}
}
},
{
"name": "userid",
"type": {
"type":"string",
"arg.properties":{"regex":"User_[1-9][0-2]"}
}
},
{
"name": "regionid",
"type": {
"type":"string",
"arg.properties":{"regex":"Region_[1-9]"}
}
},
{
"name": "gender",
"type": {
"type":"string",
"arg.properties":{
"options":["MALE","FEMALE","OTHER"]
}
}
}
]}
在检查版本时,它仍然选择 "io.confluent.ksql.avro_schemas" 架构 -
curl "http://localhost:8081/subjects/test-user-value/versions/1"
{"subject":"test-user-value","version":1,"id":4,"schema":"{"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"registertime","type":["null","long"],"default":null},{"name":"userid", "type":["null","string"],"default":null},{"name":"regionid","type":["null","string"],"default":null},{"name":"gender","type":["null","string"],"default":null}]}"}
尝试使用 Kafka-streams 时出现以下错误 API -
Exception in thread "PageView-Users-Stream-Join-eg-1dc610a3-c9d9-4c1e-b5eb-910e4bc74826-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80) at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:711) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:833) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 4 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class io.confluent.ksql.avro_schemas.KsqlDataSourceSchema specified in writer's schema whilst finding reader's schema for a SpecificRecord.
回答于 https://github.com/confluentinc/schema-registry/issues/980
Datagen always defines the namespace as
io.confluent.ksql.avro_schemas
. See confluentinc/ksql#1906
现在 other ways 也可以将测试数据生成到 Kafka 中。