收集未成功写入 Kafka 的消息
Collecting messages which are not successfully written on Kafka
我正在读取文件并将每条记录转储到 Kafka 上。这是我的制作人代码:
public void produce(String topicName, String filePath, String bootstrapServers, String encoding) {
try (BufferedReader bf = getBufferedReader(filePath, encoding);
KafkaProducer<Object, String> producer = initKafkaProducer(bootstrapServers)) {
String line;
long count = 0;
while ((line = bf.readLine()) != null) {
count++;
producer.send(new ProducerRecord<>(topicName, line), (metadata, e) -> {
if(e != null){
e.printStackTrace();
//write record to some file.
}
});
}
producer.flush();
CustomLogger.log("Done producing data messages. Total no of records produced:" + count);
} catch (IOException e) {
Throwables.propagate(e);
}
}
private static KafkaProducer<Object, String> initKafkaProducer(String bootstrapServer) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServer);
properties.put("key.serializer", StringSerializer.class.getCanonicalName());
properties.put("value.serializer", StringSerializer.class.getCanonicalName());
properties.put("acks", "-1");
properties.put("retries", 4);
return new KafkaProducer<>(properties);
}
private BufferedReader getBufferedReader(String filePath, String encoding) throws UnsupportedEncodingException, FileNotFoundException {
return new BufferedReader(new InputStreamReader(new FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));
}
根据我们的基本测试,生成消息可能会因 TimeoutException 而失败。然而,根据 official documentation of Callback TimeoutException 是一个可重试的异常。意味着在下次重试时可能会生成此消息。因此,如果我在回调中发现 TimeoutException,我不能认为记录发送失败。有什么可靠的方法可以肯定地说记录发送失败并将其记录在单独的文件中吗?
我粗略地看了一下代码,不认为您需要在此处区分可重试异常和 non-retriable 异常,因为这已经在 KafkaProducer 中发生了。
当您使用大于 1 的 retries 值配置生产者时,它将重新发送任何因可重试异常而失败的消息(批处理),次数与您告诉它的次数一样多到,在将异常返回给您之前。
所以基本上,您收到的任何消息都带有生产者放弃的异常。
查看代码中的 completeBatch & canRetry 以确认我的理解,但我个人认为这种行为是有道理的。
我正在读取文件并将每条记录转储到 Kafka 上。这是我的制作人代码:
public void produce(String topicName, String filePath, String bootstrapServers, String encoding) {
try (BufferedReader bf = getBufferedReader(filePath, encoding);
KafkaProducer<Object, String> producer = initKafkaProducer(bootstrapServers)) {
String line;
long count = 0;
while ((line = bf.readLine()) != null) {
count++;
producer.send(new ProducerRecord<>(topicName, line), (metadata, e) -> {
if(e != null){
e.printStackTrace();
//write record to some file.
}
});
}
producer.flush();
CustomLogger.log("Done producing data messages. Total no of records produced:" + count);
} catch (IOException e) {
Throwables.propagate(e);
}
}
private static KafkaProducer<Object, String> initKafkaProducer(String bootstrapServer) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServer);
properties.put("key.serializer", StringSerializer.class.getCanonicalName());
properties.put("value.serializer", StringSerializer.class.getCanonicalName());
properties.put("acks", "-1");
properties.put("retries", 4);
return new KafkaProducer<>(properties);
}
private BufferedReader getBufferedReader(String filePath, String encoding) throws UnsupportedEncodingException, FileNotFoundException {
return new BufferedReader(new InputStreamReader(new FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));
}
根据我们的基本测试,生成消息可能会因 TimeoutException 而失败。然而,根据 official documentation of Callback TimeoutException 是一个可重试的异常。意味着在下次重试时可能会生成此消息。因此,如果我在回调中发现 TimeoutException,我不能认为记录发送失败。有什么可靠的方法可以肯定地说记录发送失败并将其记录在单独的文件中吗?
我粗略地看了一下代码,不认为您需要在此处区分可重试异常和 non-retriable 异常,因为这已经在 KafkaProducer 中发生了。
当您使用大于 1 的 retries 值配置生产者时,它将重新发送任何因可重试异常而失败的消息(批处理),次数与您告诉它的次数一样多到,在将异常返回给您之前。
所以基本上,您收到的任何消息都带有生产者放弃的异常。
查看代码中的 completeBatch & canRetry 以确认我的理解,但我个人认为这种行为是有道理的。