Apache Kafka JDBC 连接器 - SerializationException:未知的魔法字节
Apache Kafka JDBC Connector - SerializationException: Unknown magic byte
我们正在尝试使用 Confluent JDBC 接收器连接器将主题中的值写回到 postgres 数据库。
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=xxx
tasks.max=1
topics=topic_name
auto.evolve=true
connection.user=confluent_rw
auto.create=true
connection.url=jdbc:postgresql://x.x.x.x:5432/Datawarehouse
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
我们可以使用以下命令读取控制台中的值:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topic_name
模式存在并且值被 kafka-avro-console-consumer
正确反序列化,因为它没有给出错误但连接器给出了这些错误:
{
"name": "datawarehouse_sink",
"connector": {
"state": "RUNNING",
"worker_id": "x.x.x.x:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "x.x.x.x:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: f_machinestate_sink\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord[=13=](WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
}
],
"type": "sink"
}
最后的错误是:
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
架构已在架构注册表中注册。
问题是否出在连接器的配置文件上?
错误 org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
表示有关该主题的消息不是有效的 Avro,无法反序列化。这可能有多种原因:
有些消息是 Avro,有些则不是。如果是这种情况,您可以使用 Kafka Connect 中的错误处理功能来忽略使用如下配置的无效消息:
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true
value 是 Avro 但 key 不是。如果是这种情况,则使用适当的 key.converter
.
更多阅读:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
这意味着解串器已经检查了消息的前 5 个字节并发现了一些意外的东西。有关使用序列化程序 here 打包消息的更多信息,请查看 'wire format' 部分。只是猜测消息中的零字节!=0
我们正在尝试使用 Confluent JDBC 接收器连接器将主题中的值写回到 postgres 数据库。
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=xxx
tasks.max=1
topics=topic_name
auto.evolve=true
connection.user=confluent_rw
auto.create=true
connection.url=jdbc:postgresql://x.x.x.x:5432/Datawarehouse
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
我们可以使用以下命令读取控制台中的值:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topic_name
模式存在并且值被 kafka-avro-console-consumer
正确反序列化,因为它没有给出错误但连接器给出了这些错误:
{
"name": "datawarehouse_sink",
"connector": {
"state": "RUNNING",
"worker_id": "x.x.x.x:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "x.x.x.x:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: f_machinestate_sink\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord[=13=](WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
}
],
"type": "sink"
}
最后的错误是:
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
架构已在架构注册表中注册。
问题是否出在连接器的配置文件上?
错误 org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
表示有关该主题的消息不是有效的 Avro,无法反序列化。这可能有多种原因:
有些消息是 Avro,有些则不是。如果是这种情况,您可以使用 Kafka Connect 中的错误处理功能来忽略使用如下配置的无效消息:
"errors.tolerance": "all", "errors.log.enable":true, "errors.log.include.messages":true
value 是 Avro 但 key 不是。如果是这种情况,则使用适当的
key.converter
.
更多阅读:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
这意味着解串器已经检查了消息的前 5 个字节并发现了一些意外的东西。有关使用序列化程序 here 打包消息的更多信息,请查看 'wire format' 部分。只是猜测消息中的零字节!=0