没有模式注册表的Nodejs avro序列化,然后在Kafka Streams中进行反序列化
Nodejs avro serialization without schema registry followed by deserialization in Kafka Streams
我想就以下问题寻求一些指导。我正在尝试学习如何在没有架构注册表的情况下使用 nodejs 执行 Avro 数据序列化,将其发布到 Kafka 集群,然后在 Kafka Streams (Java) 中检索它。
在 javascript 方面,我尝试使用 kafka-node 和 avsc 进行序列化。在 Kafka Streams 中,我决定实现自定义 Serde,因为据我所知,Streams API 提供的 Avro Serdes 旨在直接从模式注册表中获取模式。
这是一个简单制作人的 javascript 代码片段:
const avro = require('avsc');
const messageKey = "1";
const schemaType = avro.Type.forSchema({
type: "record",
name: "product",
fields: [
{
name: "id",
type: "int"
},
{
name: "name",
type: "string"
},
{
name: "price",
type: "double"
},
{
name: "stock",
type: "int"
}
]
});
const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);
下面是我目前尝试实现反序列化器的方式:
public Product deserialize(String topic, byte[] data) {
SeekableByteArrayInput inputstream = new SeekableByteArrayInput(data);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader;
Product product = null;
try {
dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
GenericRecord record = new GenericData.Record(schema);
while(dataFileReader.hasNext()) {
dataFileReader.next();
product = genericRecordToObject(record, new Product());
}
} catch (IOException e) {
e.printStackTrace();
}
return product;
}
但是,当流应用程序尝试反序列化数据时,我遇到了以下错误,特别是在实例化 DataFileReader 的代码行:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:138)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:128)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:1)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:168)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:109)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:156)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:925)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:763)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
我不知该如何继续。如有任何建议,我们将不胜感激。
也许我错了,但我认为你不应该使用 DataFileReader,而应该使用 DatumReader。
我在kafka(不是Kafka Streams)做了类似的事情,也许可以给你一些想法:
完整示例(非常简单)在这里:
https://github.com/anigmo97/KafkaRecipes/blob/master/java/consumers/StringKeyAvroValueConsumers/StandardAvro/StandardAvroConsumer.java
如您所见,我没有创建序列化程序,我反序列化了值并获得了通用记录。
public static void main(String[] args) {
final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(getConsumerProperties());
consumer.subscribe(Collections.singleton(TOPIC));
ConsumerRecords<String, byte[]> consumerRecords;
String valueSchemaString = "{\"type\": \"record\",\"namespace\": \"example.avro\",\"name\": \"test_record\","
+ "\"fields\":[" + "{\"name\": \"id\",\"type\": \"int\"},"
+ "{\"name\": \"date\",\"type\": [\"int\", \"null\"]}," + "{\"name\": \"info\",\"type\": \"string\"}"
+ "]}}";
Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
try {
while (true) {
consumerRecords = consumer.poll(1000);
consumerRecords.forEach(record -> {
ByteArrayInputStream inputStream = new ByteArrayInputStream(record.value());
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
GenericRecord deserializedValue = null;
try {
deserializedValue = datumReader.read(null, binaryDecoder);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.printf("Consumer Record:(%s, %s)\n", record.key(), deserializedValue);
});
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
System.out.println("DONE");
}
}
希望对您有所帮助。
我想就以下问题寻求一些指导。我正在尝试学习如何在没有架构注册表的情况下使用 nodejs 执行 Avro 数据序列化,将其发布到 Kafka 集群,然后在 Kafka Streams (Java) 中检索它。
在 javascript 方面,我尝试使用 kafka-node 和 avsc 进行序列化。在 Kafka Streams 中,我决定实现自定义 Serde,因为据我所知,Streams API 提供的 Avro Serdes 旨在直接从模式注册表中获取模式。
这是一个简单制作人的 javascript 代码片段:
const avro = require('avsc');
const messageKey = "1";
const schemaType = avro.Type.forSchema({
type: "record",
name: "product",
fields: [
{
name: "id",
type: "int"
},
{
name: "name",
type: "string"
},
{
name: "price",
type: "double"
},
{
name: "stock",
type: "int"
}
]
});
const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);
下面是我目前尝试实现反序列化器的方式:
public Product deserialize(String topic, byte[] data) {
SeekableByteArrayInput inputstream = new SeekableByteArrayInput(data);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader;
Product product = null;
try {
dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
GenericRecord record = new GenericData.Record(schema);
while(dataFileReader.hasNext()) {
dataFileReader.next();
product = genericRecordToObject(record, new Product());
}
} catch (IOException e) {
e.printStackTrace();
}
return product;
}
但是,当流应用程序尝试反序列化数据时,我遇到了以下错误,特别是在实例化 DataFileReader 的代码行:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:138)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:128)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:1)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:168)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:109)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:156)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:925)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:763)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
我不知该如何继续。如有任何建议,我们将不胜感激。
也许我错了,但我认为你不应该使用 DataFileReader,而应该使用 DatumReader。
我在kafka(不是Kafka Streams)做了类似的事情,也许可以给你一些想法:
完整示例(非常简单)在这里: https://github.com/anigmo97/KafkaRecipes/blob/master/java/consumers/StringKeyAvroValueConsumers/StandardAvro/StandardAvroConsumer.java
如您所见,我没有创建序列化程序,我反序列化了值并获得了通用记录。
public static void main(String[] args) {
final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(getConsumerProperties());
consumer.subscribe(Collections.singleton(TOPIC));
ConsumerRecords<String, byte[]> consumerRecords;
String valueSchemaString = "{\"type\": \"record\",\"namespace\": \"example.avro\",\"name\": \"test_record\","
+ "\"fields\":[" + "{\"name\": \"id\",\"type\": \"int\"},"
+ "{\"name\": \"date\",\"type\": [\"int\", \"null\"]}," + "{\"name\": \"info\",\"type\": \"string\"}"
+ "]}}";
Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
try {
while (true) {
consumerRecords = consumer.poll(1000);
consumerRecords.forEach(record -> {
ByteArrayInputStream inputStream = new ByteArrayInputStream(record.value());
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
GenericRecord deserializedValue = null;
try {
deserializedValue = datumReader.read(null, binaryDecoder);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.printf("Consumer Record:(%s, %s)\n", record.key(), deserializedValue);
});
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
System.out.println("DONE");
}
}
希望对您有所帮助。