无法以 AVRO 格式从 Kafka Producer 发送 GenericRecord 数据
Unable to send GenericRecord data from Kafka Producer in AVRO format
使用 confluent-oss-5.0.0-2.11
我的 Kafka Producer 代码是
public class AvroProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
//props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
Schema.Parser parser = new Schema.Parser();
// I will get below schema string from SCHEMA REGISTRY
Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"location\",\"type\":\"string\",\"default\":\"Noida\"}]}");
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
GenericRecord record = new GenericData.Record(schema);
record.put("uID", "06080000");
record.put("userName", "User data10");
record.put("company", "User data10");
record.put("age", 12);
record.put("location", "User data10");
ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
producer.send(recordData);
System.out.println("Message Sent");
}
}
Producer 代码似乎没问题,可以在控制台上看到 Message Sent。
Kafka Consumer代码是:
public class AvroConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("group.id", "consumer1");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
while(true){
ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
for (ConsumerRecord<String, GenericRecord> rec : recs) {
System.out.printf("{AvroUtilsConsumerUser}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
}
}
}
}
我无法在 Kafka 消费者端看到消息(数据)。我还检查了 confluent_new 主题的偏移量 count/status 并且它没有更新。生产者代码似乎有问题。
任何指针都会有所帮助。
同时下面的生产者代码正在运行,这里的 POJO 即 User 是 avro-tools 生成的 POJO。
public class AvroProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
kafkaParams.put("auto.offset.reset", "smallest");
kafkaParams.put("ZOOKEEPER_HOST", "bihdp01");*/
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
Producer<String, User> producer = new KafkaProducer<String, User>(props);
User user = new User();
user.setUID("0908");
user.setUserName("User data10");
user.setCompany("HCL");
user.setAge(20);
user.setLocation("Noida");
ProducerRecord<String, User> record = new ProducerRecord<String, User>(topic, (String) user.getUID(), user);
producer.send(record).get();
System.out.println("Sent");
}
}
P.S. 我的要求是以 AVRO 格式将接收到的 JSON 数据从源 KAFKA 主题发送到目标 KAFKA 主题。首先,我使用 AVRO4S 从收到的 JSON 数据中推断出 AVRO 模式,并将该模式注册到 SCHEMA REGISTRY。接下来是从接收到的 JSON 中提取数据并填充到 GenericRecord 实例中,然后使用 KafkaAvroSerializer 将此 GenericRecord 实例发送到 Kafka 主题。在消费者端,我将使用 KafkaAvroDeserializer 反序列化接收到的 AVRO 数据。
请尝试在第一个 Producer 中添加 get()
producer.send(recordData).get();
在寻找解决方案的过程中,我尝试了 Thread.sleep(1000),它解决了我的问题。我也试过 producer.send(record).get() ,这也解决了这个问题。在完成 Documentation 之后,我发现了下面的代码片段,它提示了解决方案。
// When you're finished producing records, you can
flush the producer to ensure it has all been `written` to Kafka and
// then close the producer to free its resources.
finally {
producer.flush();
producer.close();
}
这是解决此问题的最佳方法。
使用 confluent-oss-5.0.0-2.11 我的 Kafka Producer 代码是
public class AvroProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
//props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
Schema.Parser parser = new Schema.Parser();
// I will get below schema string from SCHEMA REGISTRY
Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"location\",\"type\":\"string\",\"default\":\"Noida\"}]}");
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
GenericRecord record = new GenericData.Record(schema);
record.put("uID", "06080000");
record.put("userName", "User data10");
record.put("company", "User data10");
record.put("age", 12);
record.put("location", "User data10");
ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
producer.send(recordData);
System.out.println("Message Sent");
}
}
Producer 代码似乎没问题,可以在控制台上看到 Message Sent。
Kafka Consumer代码是:
public class AvroConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("group.id", "consumer1");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
while(true){
ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
for (ConsumerRecord<String, GenericRecord> rec : recs) {
System.out.printf("{AvroUtilsConsumerUser}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
}
}
}
}
我无法在 Kafka 消费者端看到消息(数据)。我还检查了 confluent_new 主题的偏移量 count/status 并且它没有更新。生产者代码似乎有问题。 任何指针都会有所帮助。
同时下面的生产者代码正在运行,这里的 POJO 即 User 是 avro-tools 生成的 POJO。
public class AvroProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
kafkaParams.put("auto.offset.reset", "smallest");
kafkaParams.put("ZOOKEEPER_HOST", "bihdp01");*/
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
Producer<String, User> producer = new KafkaProducer<String, User>(props);
User user = new User();
user.setUID("0908");
user.setUserName("User data10");
user.setCompany("HCL");
user.setAge(20);
user.setLocation("Noida");
ProducerRecord<String, User> record = new ProducerRecord<String, User>(topic, (String) user.getUID(), user);
producer.send(record).get();
System.out.println("Sent");
}
}
P.S. 我的要求是以 AVRO 格式将接收到的 JSON 数据从源 KAFKA 主题发送到目标 KAFKA 主题。首先,我使用 AVRO4S 从收到的 JSON 数据中推断出 AVRO 模式,并将该模式注册到 SCHEMA REGISTRY。接下来是从接收到的 JSON 中提取数据并填充到 GenericRecord 实例中,然后使用 KafkaAvroSerializer 将此 GenericRecord 实例发送到 Kafka 主题。在消费者端,我将使用 KafkaAvroDeserializer 反序列化接收到的 AVRO 数据。
请尝试在第一个 Producer 中添加 get()
producer.send(recordData).get();
在寻找解决方案的过程中,我尝试了 Thread.sleep(1000),它解决了我的问题。我也试过 producer.send(record).get() ,这也解决了这个问题。在完成 Documentation 之后,我发现了下面的代码片段,它提示了解决方案。
// When you're finished producing records, you can
flush the producer to ensure it has all been `written` to Kafka and
// then close the producer to free its resources.
finally {
producer.flush();
producer.close();
}
这是解决此问题的最佳方法。