无法在 Kafka 的 Avro 消费者端解码自定义对象
Unable to decode Custom object at Avro Consumer end in Kafka
我有一个具体的 class,我正在将其序列化为字节数组以发送到 Kafka 主题。
对于序列化,我使用 ReflectDatumWriter 。
在发送 bytes[] 之前,我在检查一些在线教程后将模式 ID 放在前 4 个字节中。
我能够发送消息,但在 Avro 控制台消费者中使用它时,我得到的响应是:
./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --property
schema.stry.url=http://0:8081 --property print.key=true --topic Test
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
MParams ddb = new MParams();
ddb.setKey("ss");
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<String, byte[]>("Test", "1", build(1, Producer.serialize(ddb)));
Future resp = kafkaFullAckProducer.send(record);
System.out.println("Success" + resp.get());
}
}
public static <T> byte[] serialize(T data) {
Schema schema = null;
if (data == null) {
throw new RuntimeException("Data cannot be null in AvroByteSerializer");
}
try {
schema = ReflectData.get().getSchema(data.getClass());
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
writer.write(data, new EncoderFactory().directBinaryEncoder(out, null));
byte[] bytes = out.toByteArray();
return bytes;
} catch (java.io.IOException e) {
throw new RuntimeException("Error serializing Avro message", e);
}
}
public static byte[] build(Integer schemaId, byte[] data) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0);
try {
out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
out.write(data);
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException e) {
throw new RuntimeException("Exception in avro record builder , msg :" + e.getMessage());
}
@Data
public class MParams extends MetricParams{
// POJO version
@Nullable
private String key;
}
@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "@c")
@Union(value= {MParams.class})
public abstract class MetricParams {
}
工作序列化器片段
public byte[] serialize(String topic, T record) {
Schema schema;
int id;
try {
schema = ReflectData.get().getSchema(record.getClass());
id = client.register(topic + "-value", schema);
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
return serializeImpl(id, schema, record);
}
protected byte[] serializeImpl(int id, Schema schema, T object) throws SerializationException {
if (object == null) {
return null;
}
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0x0);
out.write(ByteBuffer.allocate(4).putInt(id).array());
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
writer.write(object, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing Avro message", e);
}
}
解串器:
protected T deserialize(Schema schema, byte[] payload) throws SerializationException {
// Even if the caller requests schema & version, if the payload is null
// cannot include it. The caller must handle
// this case.
if (payload == null) {
return null;
}
int id = -1;
try {
ByteBuffer buffer = getByteBuffer(payload);
id = buffer.getInt();
int length = buffer.limit() - 1 - 4;
int start = buffer.position() + buffer.arrayOffset();
DatumReader<T> reader = new ReflectDatumReader<T>(schema);
T res = reader.read(null, new DecoderFactory().binaryDecoder(buffer.array(), start, length, null));
return res;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing Avro message for id " + id, e);
}
}
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0x0) {
throw new SerializationException("Unknown magic byte!");
}
return buffer;
}
For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID
不清楚您尝试 bypass the KafkaAvroSerializer
class's default behavior 的原因。 (在您的情况下,从该示例中删除 Schema.Parser
,并使用您的 Reflect 记录类型而不是 GenericRecord
)
你可以把你的具体 class 作为生产者的第二种类型,只要它实现了基本的 Avro classes,它就应该被正确序列化(意味着 ID 计算正确,而不是您创建的一些数字,并转换为字节),注册到注册表,然后发送到 Kafka
最重要的是,注册表中的架构 ID 不一定是 1,控制台使用者可能会尝试错误地反序列化您的消息,从而导致错误的输出
换句话说,试试
ProducerRecord<String, MParams> record = new ProducerRecord<>(...)
我有一个具体的 class,我正在将其序列化为字节数组以发送到 Kafka 主题。 对于序列化,我使用 ReflectDatumWriter 。 在发送 bytes[] 之前,我在检查一些在线教程后将模式 ID 放在前 4 个字节中。
我能够发送消息,但在 Avro 控制台消费者中使用它时,我得到的响应是:
./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --property schema.stry.url=http://0:8081 --property print.key=true --topic Test
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
MParams ddb = new MParams();
ddb.setKey("ss");
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<String, byte[]>("Test", "1", build(1, Producer.serialize(ddb)));
Future resp = kafkaFullAckProducer.send(record);
System.out.println("Success" + resp.get());
}
}
public static <T> byte[] serialize(T data) {
Schema schema = null;
if (data == null) {
throw new RuntimeException("Data cannot be null in AvroByteSerializer");
}
try {
schema = ReflectData.get().getSchema(data.getClass());
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
writer.write(data, new EncoderFactory().directBinaryEncoder(out, null));
byte[] bytes = out.toByteArray();
return bytes;
} catch (java.io.IOException e) {
throw new RuntimeException("Error serializing Avro message", e);
}
}
public static byte[] build(Integer schemaId, byte[] data) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0);
try {
out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
out.write(data);
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException e) {
throw new RuntimeException("Exception in avro record builder , msg :" + e.getMessage());
}
@Data
public class MParams extends MetricParams{
// POJO version
@Nullable
private String key;
}
@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "@c")
@Union(value= {MParams.class})
public abstract class MetricParams {
}
工作序列化器片段
public byte[] serialize(String topic, T record) {
Schema schema;
int id;
try {
schema = ReflectData.get().getSchema(record.getClass());
id = client.register(topic + "-value", schema);
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
return serializeImpl(id, schema, record);
}
protected byte[] serializeImpl(int id, Schema schema, T object) throws SerializationException {
if (object == null) {
return null;
}
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0x0);
out.write(ByteBuffer.allocate(4).putInt(id).array());
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
writer.write(object, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing Avro message", e);
}
}
解串器:
protected T deserialize(Schema schema, byte[] payload) throws SerializationException {
// Even if the caller requests schema & version, if the payload is null
// cannot include it. The caller must handle
// this case.
if (payload == null) {
return null;
}
int id = -1;
try {
ByteBuffer buffer = getByteBuffer(payload);
id = buffer.getInt();
int length = buffer.limit() - 1 - 4;
int start = buffer.position() + buffer.arrayOffset();
DatumReader<T> reader = new ReflectDatumReader<T>(schema);
T res = reader.read(null, new DecoderFactory().binaryDecoder(buffer.array(), start, length, null));
return res;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing Avro message for id " + id, e);
}
}
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0x0) {
throw new SerializationException("Unknown magic byte!");
}
return buffer;
}
For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID
不清楚您尝试 bypass the KafkaAvroSerializer
class's default behavior 的原因。 (在您的情况下,从该示例中删除 Schema.Parser
,并使用您的 Reflect 记录类型而不是 GenericRecord
)
你可以把你的具体 class 作为生产者的第二种类型,只要它实现了基本的 Avro classes,它就应该被正确序列化(意味着 ID 计算正确,而不是您创建的一些数字,并转换为字节),注册到注册表,然后发送到 Kafka
最重要的是,注册表中的架构 ID 不一定是 1,控制台使用者可能会尝试错误地反序列化您的消息,从而导致错误的输出
换句话说,试试
ProducerRecord<String, MParams> record = new ProducerRecord<>(...)