在 Scala 中反序列化 Avro 数据时遇到问题
Trouble with deserializing Avro data in Scala
我正在用 Scala 构建一个 Apache Flink 应用程序,它从 Kafka 总线读取流数据,然后对其执行汇总操作。来自 Kafka 的数据是 Avro 格式,需要特殊的反序列化 class。我找到了这个 scala class AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_saveveltri_scala):
package org.myorg.quickstart
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.reflect.ReflectDatumReader
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.common.serialization._
import java.io.IOException
class AvroDeserializationSchema[T](val avroType: Class[T]) extends DeserializationSchema[T] {
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
def deserialize(message: Array[Byte]): T = {
ensureInitialized()
try {
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)
}
catch {
case e: IOException => {
throw new RuntimeException(e)
}
}
}
def isEndOfStream(nextElement: T): Boolean = false
def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)
private def ensureInitialized() {
if (reader == null) {
if (classOf[SpecificRecordBase].isAssignableFrom(avroType)) {
reader = new SpecificDatumReader[T](avroType)
}
else {
reader = new ReflectDatumReader[T](avroType)
}
}
}
}
在我的流媒体 class 中,我按如下方式使用它:
val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
其中 DeviceData 是在同一项目中定义的 Scala 案例class
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double
)
编译 StreamingKafkaClient.scala class
时出现以下错误
Error:(24, 102) object java.lang.Class is not a value
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
也尝试过
val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
有了这个我得到了一个不同的错误:
Error:(21, 20) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
(x: java.util.regex.Pattern,x: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: java.util.regex.Pattern,x: org.apache.flink.api.common.serialization.DeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: java.util.List[String],x: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: java.util.List[String],x: org.apache.flink.api.common.serialization.DeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: String,x: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: String,x: org.apache.flink.api.common.serialization.DeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]
cannot be applied to (String, org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData], java.util.Properties)
.addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
我是 Scala 的新手(这是我的第一个 Scala 程序)所以我知道我在这里缺少一些基本的东西。当我尝试学习 Scala 时,有人可以指出我做错了什么。我的目的是基本上将 avro 编码的数据从 Kafka 读取到 Flink 中,并对流数据进行一些操作。我找不到任何使用 AvroDeserializationSchema 的示例 class,在我看来,这应该是原生内置到 Flink 包中的东西。
为了在 Scala 中得到一个 class 对象,你需要 classOf[DeviceData]
,而不是 Class[DeviceData]
new AvroDeserializationSchema[DeviceData](classOf[DeviceData])
I could not find any examples of the usage of AvroDeserializationSchema class
此外,看起来在 Flink 1.6 版本中,他们会添加此 class 而不是您从其他地方复制。 FLINK-9337 & FLINK-9338
如评论中所述,如果您想使用 Confluent Avro Schema Registry 而不是给出 class 类型,,或参考上面的代码 Github link
此外,如果您是 运行 Kafka 0.11+(或 Confluent 3.3+),那么理想情况下您应该使用 FlinkKafkaConsumer011
以及您要反序列化的 class
new FlinkKafkaConsumer011[DeviceData]
我正在用 Scala 构建一个 Apache Flink 应用程序,它从 Kafka 总线读取流数据,然后对其执行汇总操作。来自 Kafka 的数据是 Avro 格式,需要特殊的反序列化 class。我找到了这个 scala class AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_saveveltri_scala):
package org.myorg.quickstart
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.reflect.ReflectDatumReader
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.common.serialization._
import java.io.IOException
class AvroDeserializationSchema[T](val avroType: Class[T]) extends DeserializationSchema[T] {
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
def deserialize(message: Array[Byte]): T = {
ensureInitialized()
try {
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)
}
catch {
case e: IOException => {
throw new RuntimeException(e)
}
}
}
def isEndOfStream(nextElement: T): Boolean = false
def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)
private def ensureInitialized() {
if (reader == null) {
if (classOf[SpecificRecordBase].isAssignableFrom(avroType)) {
reader = new SpecificDatumReader[T](avroType)
}
else {
reader = new ReflectDatumReader[T](avroType)
}
}
}
}
在我的流媒体 class 中,我按如下方式使用它:
val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
其中 DeviceData 是在同一项目中定义的 Scala 案例class
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double
)
编译 StreamingKafkaClient.scala class
时出现以下错误Error:(24, 102) object java.lang.Class is not a value
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))
也尝试过
val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
有了这个我得到了一个不同的错误:
Error:(21, 20) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
(x: java.util.regex.Pattern,x: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: java.util.regex.Pattern,x: org.apache.flink.api.common.serialization.DeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: java.util.List[String],x: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: java.util.List[String],x: org.apache.flink.api.common.serialization.DeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: String,x: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x: String,x: org.apache.flink.api.common.serialization.DeserializationSchema[String],x: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]
cannot be applied to (String, org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData], java.util.Properties)
.addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))
我是 Scala 的新手(这是我的第一个 Scala 程序)所以我知道我在这里缺少一些基本的东西。当我尝试学习 Scala 时,有人可以指出我做错了什么。我的目的是基本上将 avro 编码的数据从 Kafka 读取到 Flink 中,并对流数据进行一些操作。我找不到任何使用 AvroDeserializationSchema 的示例 class,在我看来,这应该是原生内置到 Flink 包中的东西。
为了在 Scala 中得到一个 class 对象,你需要 classOf[DeviceData]
,而不是 Class[DeviceData]
new AvroDeserializationSchema[DeviceData](classOf[DeviceData])
I could not find any examples of the usage of AvroDeserializationSchema class
此外,看起来在 Flink 1.6 版本中,他们会添加此 class 而不是您从其他地方复制。 FLINK-9337 & FLINK-9338
如评论中所述,如果您想使用 Confluent Avro Schema Registry 而不是给出 class 类型,
此外,如果您是 运行 Kafka 0.11+(或 Confluent 3.3+),那么理想情况下您应该使用 FlinkKafkaConsumer011
以及您要反序列化的 class
new FlinkKafkaConsumer011[DeviceData]