Flink 的通用 Avro 解串器:覆盖 getProducedType
Generic Avro Deserializer for Flink : override getProducedType
我想创建一个通用 Avro 解串器并将其与 Kafka/Flink 一起使用。
为此,我必须从 Flink API 扩展 DeserializationSchema:
import java.io.ByteArrayInputStream
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, SchemaFor, ToRecord}
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
class FidesGenericAvroDeserializer[T](implicit schema: SchemaFor[T], toRecord: ToRecord[T], fromRecord: FromRecord[T])
extends DeserializationSchema[T] {
override def isEndOfStream(nextElement: T): Boolean = false
override def deserialize(message: Array[Byte]): T = {
AvroInputStream.binary[T](new ByteArrayInputStream(message)).iterator.toSeq.head
}
override def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(classOf[T])
}
这样做会导致编译时出现问题,因为 T 似乎不是 class :
class type required but T found
override def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(classOf[T])
我回答我自己的问题。我必须将 ClassTag
强制类型与 asInstanceOf
一起使用,但现在可以使用了:
import java.io.ByteArrayInputStream
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, SchemaFor, ToRecord}
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import scala.reflect.ClassTag
import scala.reflect._
class FidesGenericAvroDeserializer[T: ClassTag](implicit schema: SchemaFor[T], toRecord: ToRecord[T], fromRecord: FromRecord[T])
extends DeserializationSchema[T] {
override def isEndOfStream(nextElement: T): Boolean = false
override def deserialize(message: Array[Byte]): T = {
AvroInputStream.binary[T](new ByteArrayInputStream(message)).iterator.toSeq.head
}
override def getProducedType: TypeInformation[T] =
TypeExtractor.getForClass(classTag[T].runtimeClass).asInstanceOf[TypeInformation[T]]
}
我想创建一个通用 Avro 解串器并将其与 Kafka/Flink 一起使用。
为此,我必须从 Flink API 扩展 DeserializationSchema:
import java.io.ByteArrayInputStream
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, SchemaFor, ToRecord}
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
class FidesGenericAvroDeserializer[T](implicit schema: SchemaFor[T], toRecord: ToRecord[T], fromRecord: FromRecord[T])
extends DeserializationSchema[T] {
override def isEndOfStream(nextElement: T): Boolean = false
override def deserialize(message: Array[Byte]): T = {
AvroInputStream.binary[T](new ByteArrayInputStream(message)).iterator.toSeq.head
}
override def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(classOf[T])
}
这样做会导致编译时出现问题,因为 T 似乎不是 class :
class type required but T found
override def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(classOf[T])
我回答我自己的问题。我必须将 ClassTag
强制类型与 asInstanceOf
一起使用,但现在可以使用了:
import java.io.ByteArrayInputStream
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, SchemaFor, ToRecord}
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import scala.reflect.ClassTag
import scala.reflect._
class FidesGenericAvroDeserializer[T: ClassTag](implicit schema: SchemaFor[T], toRecord: ToRecord[T], fromRecord: FromRecord[T])
extends DeserializationSchema[T] {
override def isEndOfStream(nextElement: T): Boolean = false
override def deserialize(message: Array[Byte]): T = {
AvroInputStream.binary[T](new ByteArrayInputStream(message)).iterator.toSeq.head
}
override def getProducedType: TypeInformation[T] =
TypeExtractor.getForClass(classTag[T].runtimeClass).asInstanceOf[TypeInformation[T]]
}