SerializationException:反序列化 Avro 消息时出错

SerializationException: Error deserializing Avro message

我在创建 Kafka JdbcSinkConnector 时出错(我的任务是将数据从 Kafka 主题传输到 Postgres table):

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1

id -1 是什么意思?

连接器设置:

{
  "name": "MVM Test",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "mvm_test_events"
  ],
  "connection.url": "jdbc:connection",
  "connection.user": "user",
  "connection.password": "*************"
}

我还在控制中心描述了(值)主题 "mvm_test_events" 的模式:

{
  "type": "record",
  "name": "event",
  "namespace": "mvm",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "series_storage",
      "type": "int"
    },
    {
      "name": "type",
      "type": "int"
    },
    {
      "name": "entity_id",
      "type": "int"
    },
    {
      "name": "processing_ts",
      "type": "double"
    },
    {
      "name": "from_ts",
      "type": "double"
    },
    {
      "name": "to_ts",
      "type": "string"
    },
    {
      "name": "context",
      "type": {
        "type": "record",
        "name": "context",
        "fields": [
          {
            "name": "trainName",
            "type": "string"
          }
        ]
      }
    }
  ]
}

错误日志:

> [2020-01-22 06:45:10,380] ERROR Error encountered in task
> mvm-test-events-0. Executing stage 'VALUE_CONVERTER' with class
> 'io.confluent.connect.avro.AvroConverter', where consumed record is
> {topic='mvm_test_events', partition=0, offset=14,
> timestamp=1579615711794, timestampType=CreateTime}.
> (org.apache.kafka.connect.runtime.errors.LogReporter)
> org.apache.kafka.connect.errors.DataException: Failed to deserialize
> data for topic mvm_test_events to Avro:   at
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id -1 Caused by:
> org.apache.kafka.common.errors.SerializationException: Unknown magic
> byte! [2020-01-22 06:45:10,381] ERROR
> WorkerSinkTask{id=mvm-test-events-0} Task threw an uncaught and
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
> in error handler  at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.kafka.connect.errors.DataException: Failed to deserialize
> data for topic mvm_test_events to Avro:   at
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   ... 13 more Caused by:
> org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id -1 Caused by:
> org.apache.kafka.common.errors.SerializationException: Unknown magic
> byte! [2020-01-22 06:45:10,381] ERROR
> WorkerSinkTask{id=mvm-test-events-0} Task is being killed and will not
> recover until manually restarted
> (org.apache.kafka.connect.runtime.WorkerTask)

据我所知,它会尝试将主题中的记录转换为 io.confluent.connect.avro.AvroConverter。现在我应该在连接器设置中定义模式(我在主题设置中描述)名称 "Value converter class"?

你收到错误

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

当您在 Kafka Connect 中使用 AvroConverter 读取尚未序列化为 Avro 的数据时

因此,您要么需要修复生产者(并将数​​据正确序列化为 Avro),要么如果您不想使用 Avro,则修复您的 Kafka Connect 连接器配置以使用适当的转换器。

更多信息see this article

编辑:根据你更新的问题,你打算写成 Avro,所以使用 AvroConverter 是正确的。您没有将它包含在您的连接器配置中,所以我假设它已经在您的 Kafka Connect worker 属性中设置 ("value.converter": "io.confluent.connect.avro.AvroConverter")。 不知何故,您获得了与您的主题相关的数据,但不是 Avro。我建议您设置一个 dead letter queue 将这些消息路由到一个新主题以供检查,同时使您的接收器能够继续处理 Avro 消息。