当我尝试生成从文件中读取的消息时,天真的 Kafka Producer 不工作
Naive Kafka Producer not working when I try to produce messages that are read from a file
我正在尝试使用 Java 编写一个简单的 Kafka Producer。该应用程序接受两个输入:
- 要向其生成消息的 Kafka 主题名称
- 包含要生成给 Kafka 的消息的文件路径
我写了下面的代码。当我 运行 它时,我看到 System.out.println
语句打印了预期值,但由于某种原因没有向 Kafka 生成消息。我应该更改什么才能使其正常工作?
package com.myname.kafka.producer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NaiveKafkaProducer {
private static final Properties properties = new Properties();
private static Producer<String, String> producer;
private static String topic;
private static BufferedReader br;
static {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks", "all");
System.out.println("Creating Kafka producer with the following properties :: " + properties);
producer = new KafkaProducer<>(properties);
}
@SuppressWarnings("resource")
public static void main(String[] args) throws IOException {
try {
if(args.length != 0) {
topic = args[0];
File file = new File(args[1]);
br = new BufferedReader((Reader) new FileReader(file));
}
} catch (Exception e) {
System.out.println("Check input arguments. Error thrown while populating arguments to local variables");
e.printStackTrace();
}
String msg;
while ((msg = br.readLine()) != null) {
System.out.println("Message to publish : " + msg);
System.out.println("Topic : " + topic);
producer.send(new ProducerRecord<String, String>(topic, "", msg));
}
return;
}
}
令人惊讶的是,以下代码有效(我在其中对所有内容进行了硬编码):
package com.myname.kafka.producer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NaiveKafkaProducer {
private static final Properties properties = new Properties();
private static Producer<String, String> producer;
private static String topic;
private static BufferedReader br;
static {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks", "all");
System.out.println("Creating Kafka producer with the following properties :: " + properties);
producer = new KafkaProducer<>(properties);
}
public static void main(String[] args) throws IOException {
try {
String[] msgs = new String[2];
msgs[0] = "message 1";
msgs[1] = "message 2";
topic = "mytopic"
for(String msg:msgs){
producer.send(new ProducerRecord<String, String>(topic, "", msg));
}
producer.close();
} catch (Exception e) {
System.out.println("Exception caught in main method while trying to produce the messages to Kafka");
e.printStackTrace();
}
}
}
第二个代码段中调用了一个关键方法,但第一个代码段中缺少该方法
producer.close();
来自该方法的文档:
Close this producer. This method blocks until all previously sent requests complete.
当您调用方法 produce
时,实际上并不意味着已生成消息。方法returns你future
。您可以通过对 produce 方法的每个结果调用 get()
来等待生成每条消息。
我正在尝试使用 Java 编写一个简单的 Kafka Producer。该应用程序接受两个输入:
- 要向其生成消息的 Kafka 主题名称
- 包含要生成给 Kafka 的消息的文件路径
我写了下面的代码。当我 运行 它时,我看到 System.out.println
语句打印了预期值,但由于某种原因没有向 Kafka 生成消息。我应该更改什么才能使其正常工作?
package com.myname.kafka.producer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NaiveKafkaProducer {
private static final Properties properties = new Properties();
private static Producer<String, String> producer;
private static String topic;
private static BufferedReader br;
static {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks", "all");
System.out.println("Creating Kafka producer with the following properties :: " + properties);
producer = new KafkaProducer<>(properties);
}
@SuppressWarnings("resource")
public static void main(String[] args) throws IOException {
try {
if(args.length != 0) {
topic = args[0];
File file = new File(args[1]);
br = new BufferedReader((Reader) new FileReader(file));
}
} catch (Exception e) {
System.out.println("Check input arguments. Error thrown while populating arguments to local variables");
e.printStackTrace();
}
String msg;
while ((msg = br.readLine()) != null) {
System.out.println("Message to publish : " + msg);
System.out.println("Topic : " + topic);
producer.send(new ProducerRecord<String, String>(topic, "", msg));
}
return;
}
}
令人惊讶的是,以下代码有效(我在其中对所有内容进行了硬编码):
package com.myname.kafka.producer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NaiveKafkaProducer {
private static final Properties properties = new Properties();
private static Producer<String, String> producer;
private static String topic;
private static BufferedReader br;
static {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks", "all");
System.out.println("Creating Kafka producer with the following properties :: " + properties);
producer = new KafkaProducer<>(properties);
}
public static void main(String[] args) throws IOException {
try {
String[] msgs = new String[2];
msgs[0] = "message 1";
msgs[1] = "message 2";
topic = "mytopic"
for(String msg:msgs){
producer.send(new ProducerRecord<String, String>(topic, "", msg));
}
producer.close();
} catch (Exception e) {
System.out.println("Exception caught in main method while trying to produce the messages to Kafka");
e.printStackTrace();
}
}
}
第二个代码段中调用了一个关键方法,但第一个代码段中缺少该方法
producer.close();
来自该方法的文档:
Close this producer. This method blocks until all previously sent requests complete.
当您调用方法 produce
时,实际上并不意味着已生成消息。方法returns你future
。您可以通过对 produce 方法的每个结果调用 get()
来等待生成每条消息。