使用错误的 class 的 Map<String,Object> 字段 returns 值反序列化 Avro 中的对象

Deserializing objects in Avro with Map<String,Object> field returns values with wrong class

尝试在 Apache Avro 中序列化包含 Map 实例的对象,并且正在反序列化 Map 的字符串键,但值被反序列化为 class 对象。

能够将 GenericDatumWriterGenericData.Record 实例一起使用,并将属性复制到其中,但需要直接序列化对象,而不必将 Map 属性复制到临时对象中以序列化它.

public void test1() {

    TimeDot dot = new TimeDot();
    dot.lat = 12;
    dot.lon = 34;
    dot.putProperty("id", 1234);
    dot.putProperty("s", "foo");
    System.out.println("BEFORE: " + dot);

    // serialize
    ReflectDatumWriter<TimeDot> reflectDatumWriter = new ReflectDatumWriter<>(TimeDot.class);
    Schema schema = ReflectData.get().getSchema(TimeDot.class);
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    DataFileWriter<TimeDot> writer = new DataFileWriter<>(reflectDatumWriter).create(schema, out);
    writer.append(dot);
    writer.close();

    // deserialize
    ReflectDatumReader<TimeDot> reflectDatumReader = new ReflectDatumReader<>(TimeDot.class);
    ByteArrayInputStream inputStream = new ByteArrayInputStream(out.toByteArray());
    DataFileStream<TimeDot> reader = new DataFileStream<>(inputStream, reflectDatumReader);
    Object dot2 = reader.next();
    reader.close();
    System.out.println("AFTER: " + dot2);
}

public static class TimeDot {
    Map<String, Object> props = new LinkedHashMap<>();
    double lat;
    double lon;

    public void putProperty(String key, Object value) {
        props.put(key, value);
    }

    public String toString() {
        return "lat="+ lat +", lon="+ lon +", props="+props;
    }
}

输出:

 BEFORE: lat=12.0, lon=34.0, props={id=1234, s=foo}

 AFTER:  lat=12.0, lon=34.0, props={id=java.lang.Object@2b9627bc, s=java.lang.Object@65e2dbf3}

接下来尝试手动创建架构,但序列化失败。

Exception in thread "main" java.lang.NullPointerException: in TimeDot in map in java.lang.Object null of java.lang.Object of map in field props of TimeDot

public void test2() throws IOException {        

    TimeDot dot = new TimeDot();
    dot.lat = 12;
    dot.lon = 34;
    dot.putProperty("id", 1234);
    dot.putProperty("s", "foo");
    System.out.println(dot);

    // create Schema
    List<Schema.Field> propFields = new ArrayList<>();
    propFields.add(new Schema.Field("id", Schema.create(Schema.Type.INT)));
    propFields.add(new Schema.Field("s", Schema.create(Schema.Type.STRING)));
    Schema propRecSchema = Schema.createRecord("Object",null,"java.lang",false,propFields);
    Schema propSchema = Schema.createMap(propRecSchema);
    List<Schema.Field> fields = new ArrayList<>(3);
    fields.add(new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)));
    fields.add(new Schema.Field("lon", Schema.create(Schema.Type.DOUBLE)));
    fields.add(new Schema.Field("props", propSchema));
    Schema schema = Schema.createRecord("TimeDot", null, "", false, fields);
    System.out.println("\nschema:\n" + schema);

    // serialize
    ReflectDatumWriter<TimeDot> reflectDatumWriter = new ReflectDatumWriter<>(TimeDot.class);
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    DataFileWriter<TimeDot> writer = new DataFileWriter<>(reflectDatumWriter).create(schema, out);
    writer.append(dot); // *** fails here > NullPointerException ***
    writer.close();

    // deserialize
    ReflectDatumReader<TimeDot> reader = new ReflectDatumReader<>(schema);
    TimeDot dot2 = reader.read(null,
            DecoderFactory.get().binaryDecoder(out.toByteArray(), null));
    System.out.println(dot2);
}

我认为最简单的方法是添加注释

@org.apache.avro.reflect.AvroSchema("{\"type\": \"map\", \"values\": [\"string\", \"int\"]}")
Map<String, Object> props = new LinkedHashMap<>();

要序列化包含 Map 的对象,必须在 Avro 模式中定义一个 Union,其中包含所有可能类型的值的列表。

重要提示: 如果没有正确设置命名空间,那么反序列化 returns 一个 GenericData.Record 而不是 TimeDot class 实例。

    List<Schema.Field> fields = new ArrayList<>();
    fields.add(new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)));
    fields.add(new Schema.Field("lon", Schema.create(Schema.Type.DOUBLE)));
    fields.add(new Schema.Field("props", Schema.createMap(
            Schema.createUnion(Arrays.asList(
                Schema.create(Schema.Type.INT),
                Schema.create(Schema.Type.STRING))))));

    Schema schema = Schema.createRecord("TimeDot", null, "TestAvroUnion", false, fields);

    TimeDot dot = new TimeDot();
    dot.lat = 12;
    dot.lon = 34;
    dot.putProperty("id", 1234);
    dot.putProperty("s", "foo");
    System.out.println("BEFORE: " + dot);

    // serialize
    ReflectDatumWriter<TimeDot> reflectDatumWriter = new ReflectDatumWriter<>(schema);
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    DataFileWriter<TimeDot> dataWriter = new DataFileWriter<>(reflectDatumWriter);
    dataWriter.create(schema, out);
    dataWriter.append(dot);
    dataWriter.close();

    // deserialize
    ReflectDatumReader<TimeDot> reflectDatumReader = new ReflectDatumReader<>(schema);
    try(
        ByteArrayInputStream bis = new ByteArrayInputStream(out.toByteArray());
        DataFileStream<TimeDot> reader = new DataFileStream<>(bis, reflectDatumReader)
    ) {
        TimeDot dot2 = reader.next();
        System.out.println("AFTER:  " + dot2);
    }
}

输出结果如下:

 BEFORE: lat=12.0, lon=34.0, props={id=1234, s=foo}
 AFTER:  lat=12.0, lon=34.0, props={id=1234, s=foo}

或者使用 SchemaBuilder 创建模式:

 Schema schema = SchemaBuilder
            .record("TimeDot")
            .namespace("TestUnion")
            .fields()
            .name("lat")
                .type().doubleType()
                .noDefault()
            .name("lon")
                .type().doubleType()
                .noDefault()
            .name("props")
                .type().map()
                    .values(SchemaBuilder.unionOf().intType().and().stringType().endUnion())
                .noDefault()
            .endRecord();