读取现有的 Avro 文件并发送到 Kafka
Read Existing Avro File and Send to Kafka
我有一个包含架构的现有 Avro 文件。我需要将文件发送给 Producer。
以下是我写的代码
public class ProducerDataSample {
public static void main(String[] args) {
String topic = "my-topic";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(AvroSchemaDefinitionLoader.fromFile("encounter.avsc").get());
File file = new File("/home/hello.avro");
try{
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, outputStream);
dataFileWriter.appendTo(file);
dataFileWriter.close();
System.out.println("Here comes the data: " + outputStream);
// Start KAFKA publishing
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
ProducerRecord<String, byte[]> producerRecord = null;
producerRecord = new ProducerRecord<String, byte[]>("m-topic","1",outputStream.toByteArray());
messageProducer.send(producerRecord);
messageProducer.close();
}catch(Exception e){
System.out.println("Error in sending to kafka");
e.printStackTrace();
}
}
}
执行此操作后,我收到错误消息:
Error in sending to kafka org.apache.avro.AvroRuntimeException: already open
at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85)
at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:203)
at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:193)
at ProducerDataSample.main(ProducerDataSample.java:51)
任何帮助。
谢谢
您必须从 avro 文件中读取数据并将其序列化为字节数组
类似下面的代码片段
final Schema schema = new Schema.Parser().parse(new File("sample.avsc"));
File file ="sample.avro"
//read the avro file to GenericRecord
final GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(schema);
final DataFileReader<GenericRecord> genericRecords = new DataFileReader<>(file, genericDatumReader);
//serialize GenericRecords
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(out, null);
while (genericRecords.hasNext()) {
writer.write(genericRecords.next(), binaryEncoder);
}
binaryEncoder.flush();
out.close();
//send out.toByteArray() to kakfa
我认为另一个答案应该像这样将个人记录作为 Kafka 事件发送。
注意:应该可以直接从 Avro 文件中获取架构,而不是使用单独的 AVSC 文件。 Following code in the Avro project
final Schema schema = new Schema.Parser().parse(new File("sample.avsc"));
File file = new File("sample.avro");
//read the avro file to GenericRecord
final GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(schema);
final DataFileReader<GenericRecord> genericRecords = new DataFileReader<>(file, genericDatumReader);
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
while (genericRecords.hasNext()) {
//serialize GenericRecords
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(genericRecords.next(), binaryEncoder);
binaryEncoder.flush();
out.close();
// TODO: send out.toByteArray() to kafka
}
// TODO: kafkaProducer.flush();
我有一个包含架构的现有 Avro 文件。我需要将文件发送给 Producer。
以下是我写的代码
public class ProducerDataSample {
public static void main(String[] args) {
String topic = "my-topic";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(AvroSchemaDefinitionLoader.fromFile("encounter.avsc").get());
File file = new File("/home/hello.avro");
try{
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, outputStream);
dataFileWriter.appendTo(file);
dataFileWriter.close();
System.out.println("Here comes the data: " + outputStream);
// Start KAFKA publishing
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
ProducerRecord<String, byte[]> producerRecord = null;
producerRecord = new ProducerRecord<String, byte[]>("m-topic","1",outputStream.toByteArray());
messageProducer.send(producerRecord);
messageProducer.close();
}catch(Exception e){
System.out.println("Error in sending to kafka");
e.printStackTrace();
}
}
}
执行此操作后,我收到错误消息:
Error in sending to kafka org.apache.avro.AvroRuntimeException: already open
at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85)
at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:203)
at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:193)
at ProducerDataSample.main(ProducerDataSample.java:51)
任何帮助。 谢谢
您必须从 avro 文件中读取数据并将其序列化为字节数组
类似下面的代码片段
final Schema schema = new Schema.Parser().parse(new File("sample.avsc"));
File file ="sample.avro"
//read the avro file to GenericRecord
final GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(schema);
final DataFileReader<GenericRecord> genericRecords = new DataFileReader<>(file, genericDatumReader);
//serialize GenericRecords
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(out, null);
while (genericRecords.hasNext()) {
writer.write(genericRecords.next(), binaryEncoder);
}
binaryEncoder.flush();
out.close();
//send out.toByteArray() to kakfa
我认为另一个答案应该像这样将个人记录作为 Kafka 事件发送。
注意:应该可以直接从 Avro 文件中获取架构,而不是使用单独的 AVSC 文件。 Following code in the Avro project
final Schema schema = new Schema.Parser().parse(new File("sample.avsc"));
File file = new File("sample.avro");
//read the avro file to GenericRecord
final GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(schema);
final DataFileReader<GenericRecord> genericRecords = new DataFileReader<>(file, genericDatumReader);
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
while (genericRecords.hasNext()) {
//serialize GenericRecords
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(genericRecords.next(), binaryEncoder);
binaryEncoder.flush();
out.close();
// TODO: send out.toByteArray() to kafka
}
// TODO: kafkaProducer.flush();