Avro GenericRecord 到嵌套的 POJO

Avro GenericRecord to nested POJO

有没有办法将 GenericRecord(我刚从 Kafka 消息中得到)反序列化为嵌套的 POJO?我实际上想将它反序列化为 Scala 的案例 class 但我意识到这更难。我通过互联网搜索,似乎每个人都在手动进行。您是否知道任何能够执行此操作的图书馆?

应用模式有一个非常通用的编解码器推导解决方案:

https://github.com/danslapman/morphling

它不提供 "import and use" 解决方案,但它确实提供了一种方法来为您的协议编写您自己的编解码器派生机制,而不会弄乱 shapeless/magnolia。

此外,如果您需要处理二进制数据,请尝试:

https://github.com/scodec/scodec

它提供了解决此类问题的漂亮的 scala 方法。

我想到了这个:

  def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
val readerSchema = ReflectData.get().getSchema(targetType)
val idSize = 4


val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
    def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
      this.configure(new KafkaAvroDeserializerConfig(configs))

  def deserialize(topic: String, data: Array[Byte]): A = {
      val bytes = ByteBuffer.wrap(data)
      bytes.get() // skip magic byte
      val schemaId = bytes.getInt()
      val writerSchema = schemaRegistry.getById(schemaId)
      val length = bytes.limit() - 1 - idSize
      val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
      val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
      reader.read(null.asInstanceOf[A], decoder)
    }

  def close(): Unit = {}
}
val props = Map("schema.registry.url" -> schemaRegistryUrl)
deserializer.configure(props.asJava, false)
deserializer

}