在 serializing/deserializing Avro Record 之后在 Scala 中将 java.util.HashMap 转换为 JSON

Converting java.util.HashMap to JSON in Scala after serializing/deserializing Avro Record

我正在尝试使用 Scala serialize/deserialize Avro 映射 complexType

反序列化后,无法使用jackson将HashMap转换为JSON。

我期待以下输出:

{"MyKey2":"MyValue2", "MyKey1":MyValue1"}

但是我得到了以下输出:

{"MyKey2":{"bytes":"TXlWYWx1ZTI=","length":8,"byteLength":8},"MyKey1":{"bytes":"TXlWYWx1ZTE=","length":8,"byteLength":8}}

关于反序列化后如何处理 HashMap 的任何线索?代码:

import java.io.ByteArrayOutputStream

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io._
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter}

object ScalaSandbox {

  def main(args: Array[String]) {

    //Avro Schema and Schema Parser
    val userSchema =
      """
        |{
        |  "type":"record",
        |  "name":"myrecord",
        |  "fields": [
        |    {"name": "test_str", "type":"string"},
        |    {"name": "test_map", "type": ["null", {"type": "map", "values": "string"}]}
        |  ]
        |}
      """.stripMargin
    val parser = new Schema.Parser()
    val schema = parser.parse(userSchema)

    //Create Record
    val f2map = new java.util.HashMap[String,String]
    f2map.put("MyKey1", "MyValue1")
    f2map.put("MyKey2", "MyValue2")
    val avroRecord: Record = new Record(schema)
    avroRecord.put("test_str", "test")
    avroRecord.put("test_map", f2map)

    //Serialize Record to Avro
    val writer = new SpecificDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    writer.write(avroRecord, encoder)
    encoder.flush()
    out.close()
    val serializedBytes: Array[Byte] = out.toByteArray()

    //Deserialize Record from Avro
    val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema)
    val decoder: Decoder = DecoderFactory.get().binaryDecoder(serializedBytes, null)
    val userData: GenericRecord = reader.read(null, decoder)

    //Convert HashMap to JSON
    val test_str: String = userData.get("test_str").toString
    val test_map: java.util.HashMap[String,String] = userData.get("test_map").asInstanceOf[java.util.HashMap[String,String]]
    val example = new Example(test_str, test_map)

    println("toString of HashMap: " + example.get_map.toString) // {MyKey2=MyValue2, MyKey1=MyValue1}
    println("writeValueAsString of Hashmap: " + example.get_map_json) // {"MyKey2":"MyValue2", "MyKey1":MyValue1"}
  }

  class Example(str_field: String, map_field: java.util.HashMap[String,String]) {
    val mapper = new ObjectMapper()
    def get_str: String = str_field
    def get_map: java.util.HashMap[String,String] = map_field
    def get_map_json: String = mapper.writeValueAsString(map_field)
  }

}

请更改示例 class mapper.writeValueAsString 代码。 杰克逊图书馆可能有问题。

mapper.writeValueAsString(map_field.toString.replaceAll("=", ":"))

使用 Jackson 库无法正常解析反序列化地图,因为自 Avro 1.5 起,Avro 地图复杂数据类型使用 org.apache.avro.util.Utf8

如果我使用反序列化的 Map 对象作为 java.util.HashMap[Utf8,Utf8] 的实例,我能够以非常低效的方式将我的 Map KV 转换为 Json。

无论如何,我错误地尝试做一些 Avro 库可以很容易地自己做的事情 jsonEncoder

所以,假设我们已经将一些 avro 有效载荷反序列化为 GenericRecords,我们可以像这样将其转换为 Json:

  def convertGenericRecordtoJson(record: GenericRecord): String = {
    val outputStream = new ByteArrayOutputStream()
    val jsonEncoder = EncoderFactory.get().jsonEncoder(record.getSchema,outputStream)
    val datumWriter = new GenericDatumWriter[GenericRecord](record.getSchema)
    datumWriter.write(record, jsonEncoder)
    jsonEncoder.flush
    outputStream.flush
    return new String(outputStream.toByteArray, Charset.forName("UTF-8"))
  }

此函数将产生有效的 JSON 字符串:

{"test_str":"test","test_map":{"map":{"MyKey2":"MyValue2","MyKey1":"MyValue1"}}}