无法在 Kafka 的 Avro 消费者端解码自定义对象

Unable to decode Custom object at Avro Consumer end in Kafka

我有一个具体的 class,我正在将其序列化为字节数组以发送到 Kafka 主题。 对于序列化,我使用 ReflectDatumWriter 。 在发送 bytes[] 之前,我在检查一些在线教程后将模式 ID 放在前 4 个字节中。

我能够发送消息,但在 Avro 控制台消费者中使用它时,我得到的响应是:

./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --property schema.stry.url=http://0:8081 --property print.key=true --topic Test

"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"

    MParams ddb = new MParams();
    ddb.setKey("ss");

    for (int i = 0; i < 10; i++) {
        ProducerRecord record = new ProducerRecord<String, byte[]>("Test", "1", build(1, Producer.serialize(ddb)));
        Future resp = kafkaFullAckProducer.send(record);

        System.out.println("Success" + resp.get());
    }
}

public static <T> byte[] serialize(T data) {
    Schema schema = null;
    if (data == null) {
        throw new RuntimeException("Data cannot be null in AvroByteSerializer");
    }
    try {
        schema = ReflectData.get().getSchema(data.getClass());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
        writer.write(data, new EncoderFactory().directBinaryEncoder(out, null));
        byte[] bytes = out.toByteArray();
        return bytes;
    } catch (java.io.IOException e) {
        throw new RuntimeException("Error serializing Avro message", e);
    }
}

public static byte[] build(Integer schemaId, byte[] data) {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(0);
    try {
        out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
        out.write(data);
        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    } catch (IOException e) {
        throw new RuntimeException("Exception in avro record builder , msg :" + e.getMessage());
    }



@Data
public class MParams extends MetricParams{

    // POJO version

    @Nullable
    private String key;


}

@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "@c")
@Union(value= {MParams.class})
public abstract class MetricParams {

}

工作序列化器片段

public byte[] serialize(String topic, T record) {
        Schema schema;
        int id;
        try {
            schema = ReflectData.get().getSchema(record.getClass());
            id = client.register(topic + "-value", schema);
        } catch (IOException | RestClientException e) {
            throw new RuntimeException(e);
        }
        return serializeImpl(id, schema, record);
    }

    protected byte[] serializeImpl(int id, Schema schema, T object) throws SerializationException {
        if (object == null) {
            return null;
        }
        try {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            out.write(0x0);
            out.write(ByteBuffer.allocate(4).putInt(id).array());

            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
            DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
            writer.write(object, encoder);
            encoder.flush();
            byte[] bytes = out.toByteArray();
            out.close();
            return bytes;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing Avro message", e);
        }
    }

解串器:

protected T deserialize(Schema schema, byte[] payload) throws SerializationException {
        // Even if the caller requests schema & version, if the payload is null
        // cannot include it. The caller must handle
        // this case.
        if (payload == null) {
            return null;
        }

        int id = -1;
        try {
            ByteBuffer buffer = getByteBuffer(payload);
            id = buffer.getInt();
            int length = buffer.limit() - 1 - 4;

            int start = buffer.position() + buffer.arrayOffset();
            DatumReader<T> reader = new ReflectDatumReader<T>(schema);
            T res = reader.read(null, new DecoderFactory().binaryDecoder(buffer.array(), start, length, null));
            return res;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing Avro message for id " + id, e);
        }
    }

    private ByteBuffer getByteBuffer(byte[] payload) {
        ByteBuffer buffer = ByteBuffer.wrap(payload);
        if (buffer.get() != 0x0) {
            throw new SerializationException("Unknown magic byte!");
        }
        return buffer;
    }

For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID

不清楚您尝试 bypass the KafkaAvroSerializer class's default behavior 的原因。 (在您的情况下,从该示例中删除 Schema.Parser,并使用您的 Reflect 记录类型而不是 GenericRecord

你可以把你的具体 class 作为生产者的第二种类型,只要它实现了基本的 Avro classes,它就应该被正确序列化(意味着 ID 计算正确,而不是您创建的一些数字,并转换为字节),注册到注册表,然后发送到 Kafka

最重要的是,注册表中的架构 ID 不一定是 1,控制台使用者可能会尝试错误地反序列化您的消息,从而导致错误的输出

换句话说,试试

ProducerRecord<String, MParams> record = new ProducerRecord<>(...)