java.lang.Instantiation 将字节流反序列化为 Scala case class 对象时出现异常

java.lang.Instantiation Exception while deserializing a byte stream into a Scala case class object

我正在尝试将 avro 字节流反序列化为 scala case class 对象。基本上,我有一个带有 avro 编码数据流的 kafka 流,现在模式增加了一个,我正在尝试更新 scala 案例 class 以包含新字段。这个案例 class 看起来像这样

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String] = None
               )  {

this() = this("na", "na", "na", 0, None) }

avro架构如下:

{
  "type": "record",
  "name": "some_name",
  "namespace": "some_namespace",
  "fields": [
    {
      "name": "deviceId",
      "type": "string"
    },
    {
      "name": "sw_version",
      "type": "string"
    }, 
    {
      "name": "timestamp",
      "type": "string"
    },
    {
      "name": "reading",
      "type": "double"
    },
    {
      "name": "new_field",
     "type": ["null", "string"],
      "default": null
    }]}

收到数据后出现以下异常:

java.lang.RuntimeException: java.lang.InstantiationException

我可以很好地接收用 python 编写的消费者的数据,所以我知道数据正在以正确的格式正确流式传输。 我怀疑问题出在 case class 构造函数的创建上,我试过这样做:

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String]
               )  {
this() = this("na", "na", "na", 0, some("na"))
}

但运气不好。

解串器代码为(摘录):

// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)

我找不到任何其他具有用于反序列化 avro 的 case classes 的构造函数的示例,我去年发布了一个相关问题 并且基于我的回复能够实现我当前的代码,从那以后一直运行良好。

我采用完全不同的方法解决了这个问题。我使用了本例 https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink. I also have a Confluent schema registry which is really easy to setup using the containerized all in one solution that comes with kafka and a schema registry https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html 中提供的 Confluent Kafka 客户端。

我不得不在我的 pom.xml 文件中添加 confluent 依赖项和 repo。这进入存储库部分。

<repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
</repository>

这进入依赖部分:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <!-- For Confluent Platform 5.2.1 -->
    <version>5.2.1</version>
</dependency>

使用 https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala 中提供的代码,我能够与 Confluent 模式注册表对话,然后根据 avro 消息中的模式 ID header 从模式 reg 下载模式并提供我返回了一个 GenericRecord object,从中我可以轻松地获得任何和所有感兴趣的领域,并创建 DeviceData object.

的新 DataStream
val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
  new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
  properties)
val device_data_stream = env
  .addSource(kafka_consumer)
  .map({x => new DeviceData(x.get("deviceId").toString,
    x.get("sw_version").toString,
    x.get("timestamp").toString,
    x.get("reading").toString.toDouble,
    x.get("new_field").toString)})

融合的 kafka 客户端负责根据模式反序列化 avro 字节流,包括默认值。设置模式注册表并使用融合的 kafka 客户端可能只需要一点时间来适应,但可能是更好的长期解决方案,只需我的 2 美分。