Kafka Avro 序列化程序:org.apache.avro.AvroRuntimeException:未打开

Kafka Avro Serializer: org.apache.avro.AvroRuntimeException: not open

我正在使用 Apache Kafka 和 Avro Serializer,使用特定的格式。我正在尝试创建自己的自定义 class 并用作 kafka 消息值。但是当我尝试发送消息时出现以下异常:

Exception in thread "main" org.apache.avro.AvroRuntimeException: not open
    at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287)
    at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

我的 Avro 架构文件如下:

{
    "namespace": "customer.avro",
    "type": "record",
    "name": "Customer",
    "fields": [{
        "name": "id",
        "type": "int"
    }, {
        "name": "name",
        "type": "string"
    }]
}

客户Class:

public class Customer {
    public int id;
    public String name;

    public Customer() {
    }

    public Customer(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

使用 Avro 的数据序列化:

public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) {
        kafkaProducer.send(record);
    }

Customer customer1 = new Customer(1001, "James");

Parser parser = new Parser();
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro"));

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.append(customer1);
dataFileWriter.close();

ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry",
        "Customer One", dataFileWriter
);
fireAndForget(record1);

我想使用 SpecificDatumWriter 编写器而不是通用编写器。此错误与什么有关?

Kafka 收到一个要序列化的键值对,你传递给它一个 DataFileWriter 这不是你想要序列化的值,那是行不通的。

你需要做的是通过 BinaryEncoderByteArrayOutputStream 用序列化的 avro 创建一个字节数组,然后将它传递给 ProducerRecord<String, byte[]>:

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
ByteArrayOutputStream os = new ByteArrayOutputStream();

try {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
  writer.write(customer1, encoder);
  e.flush();

  byte[] avroBytes = os.toByteArray();
  ProducerRecord<String, byte[]> record1 = 
    new ProducerRecord<>("CustomerCountry", "Customer One", avroBytes); 

  kafkaProducer.send(record1);
} finally {
  os.close();
}