java.lang.Instantiation 将字节流反序列化为 Scala case class 对象时出现异常
java.lang.Instantiation Exception while deserializing a byte stream into a Scala case class object
我正在尝试将 avro 字节流反序列化为 scala case class 对象。基本上,我有一个带有 avro 编码数据流的 kafka 流,现在模式增加了一个,我正在尝试更新 scala 案例 class 以包含新字段。这个案例 class 看起来像这样
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double,
new_field: Option[String] = None
) {
this() = this("na", "na", "na", 0, None)
}
avro架构如下:
{
"type": "record",
"name": "some_name",
"namespace": "some_namespace",
"fields": [
{
"name": "deviceId",
"type": "string"
},
{
"name": "sw_version",
"type": "string"
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "reading",
"type": "double"
},
{
"name": "new_field",
"type": ["null", "string"],
"default": null
}]}
收到数据后出现以下异常:
java.lang.RuntimeException: java.lang.InstantiationException
我可以很好地接收用 python 编写的消费者的数据,所以我知道数据正在以正确的格式正确流式传输。
我怀疑问题出在 case class 构造函数的创建上,我试过这样做:
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double,
new_field: Option[String]
) {
this() = this("na", "na", "na", 0, some("na"))
}
但运气不好。
解串器代码为(摘录):
// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)
我找不到任何其他具有用于反序列化 avro 的 case classes 的构造函数的示例,我去年发布了一个相关问题 并且基于我的回复能够实现我当前的代码,从那以后一直运行良好。
我采用完全不同的方法解决了这个问题。我使用了本例 https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink. I also have a Confluent schema registry which is really easy to setup using the containerized all in one solution that comes with kafka and a schema registry https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html 中提供的 Confluent Kafka 客户端。
我不得不在我的 pom.xml 文件中添加 confluent 依赖项和 repo。这进入存储库部分。
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
这进入依赖部分:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<!-- For Confluent Platform 5.2.1 -->
<version>5.2.1</version>
</dependency>
使用 https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala 中提供的代码,我能够与 Confluent 模式注册表对话,然后根据 avro 消息中的模式 ID header 从模式 reg 下载模式并提供我返回了一个 GenericRecord object,从中我可以轻松地获得任何和所有感兴趣的领域,并创建 DeviceData object.
的新 DataStream
val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
properties)
val device_data_stream = env
.addSource(kafka_consumer)
.map({x => new DeviceData(x.get("deviceId").toString,
x.get("sw_version").toString,
x.get("timestamp").toString,
x.get("reading").toString.toDouble,
x.get("new_field").toString)})
融合的 kafka 客户端负责根据模式反序列化 avro 字节流,包括默认值。设置模式注册表并使用融合的 kafka 客户端可能只需要一点时间来适应,但可能是更好的长期解决方案,只需我的 2 美分。
我正在尝试将 avro 字节流反序列化为 scala case class 对象。基本上,我有一个带有 avro 编码数据流的 kafka 流,现在模式增加了一个,我正在尝试更新 scala 案例 class 以包含新字段。这个案例 class 看起来像这样
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double,
new_field: Option[String] = None
) {
this() = this("na", "na", "na", 0, None) }
avro架构如下:
{
"type": "record",
"name": "some_name",
"namespace": "some_namespace",
"fields": [
{
"name": "deviceId",
"type": "string"
},
{
"name": "sw_version",
"type": "string"
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "reading",
"type": "double"
},
{
"name": "new_field",
"type": ["null", "string"],
"default": null
}]}
收到数据后出现以下异常:
java.lang.RuntimeException: java.lang.InstantiationException
我可以很好地接收用 python 编写的消费者的数据,所以我知道数据正在以正确的格式正确流式传输。 我怀疑问题出在 case class 构造函数的创建上,我试过这样做:
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double,
new_field: Option[String]
) {
this() = this("na", "na", "na", 0, some("na"))
}
但运气不好。
解串器代码为(摘录):
// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)
我找不到任何其他具有用于反序列化 avro 的 case classes 的构造函数的示例,我去年发布了一个相关问题
我采用完全不同的方法解决了这个问题。我使用了本例 https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink. I also have a Confluent schema registry which is really easy to setup using the containerized all in one solution that comes with kafka and a schema registry https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html 中提供的 Confluent Kafka 客户端。
我不得不在我的 pom.xml 文件中添加 confluent 依赖项和 repo。这进入存储库部分。
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
这进入依赖部分:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<!-- For Confluent Platform 5.2.1 -->
<version>5.2.1</version>
</dependency>
使用 https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala 中提供的代码,我能够与 Confluent 模式注册表对话,然后根据 avro 消息中的模式 ID header 从模式 reg 下载模式并提供我返回了一个 GenericRecord object,从中我可以轻松地获得任何和所有感兴趣的领域,并创建 DeviceData object.
的新 DataStreamval kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
properties)
val device_data_stream = env
.addSource(kafka_consumer)
.map({x => new DeviceData(x.get("deviceId").toString,
x.get("sw_version").toString,
x.get("timestamp").toString,
x.get("reading").toString.toDouble,
x.get("new_field").toString)})
融合的 kafka 客户端负责根据模式反序列化 avro 字节流,包括默认值。设置模式注册表并使用融合的 kafka 客户端可能只需要一点时间来适应,但可能是更好的长期解决方案,只需我的 2 美分。