当我尝试生成从文件中读取的消息时,天真的 Kafka Producer 不工作

Naive Kafka Producer not working when I try to produce messages that are read from a file

我正在尝试使用 Java 编写一个简单的 Kafka Producer。该应用程序接受两个输入:

  1. 要向其生成消息的 Kafka 主题名称
  2. 包含要生成给 Kafka 的消息的文件路径

我写了下面的代码。当我 运行 它时,我看到 System.out.println 语句打印了预期值,但由于某种原因没有向 Kafka 生成消息。我应该更改什么才能使其正常工作?

package com.myname.kafka.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NaiveKafkaProducer {

    private static final Properties properties = new Properties();

    private static Producer<String, String> producer;

    private static String topic;

    private static BufferedReader br;

    static {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("request.required.acks", "all");
        System.out.println("Creating Kafka producer with the following properties :: " + properties);
        producer = new KafkaProducer<>(properties);
    }

    @SuppressWarnings("resource")
    public static void main(String[] args) throws IOException {
        try {
            if(args.length != 0) {
                topic = args[0];
                File file = new File(args[1]);
                br = new BufferedReader((Reader) new FileReader(file));
            }
        } catch (Exception e) {
            System.out.println("Check input arguments. Error thrown while populating arguments to local variables");
            e.printStackTrace();
        }

        String msg;
        while ((msg = br.readLine()) != null) {
            System.out.println("Message to publish : " + msg);
            System.out.println("Topic : " + topic);
            producer.send(new ProducerRecord<String, String>(topic, "", msg));
        }
        return;
    }
}

令人惊讶的是,以下代码有效(我在其中对所有内容进行了硬编码):

package com.myname.kafka.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NaiveKafkaProducer {

    private static final Properties properties = new Properties();

    private static Producer<String, String> producer;

    private static String topic;

    private static BufferedReader br;

    static {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("request.required.acks", "all");
        System.out.println("Creating Kafka producer with the following properties :: " + properties);
        producer = new KafkaProducer<>(properties);
    }

    public static void main(String[] args) throws IOException {
            try {
                String[] msgs = new String[2];
                msgs[0] = "message 1";
                msgs[1] = "message 2";
                topic = "mytopic"
                for(String msg:msgs){
                    producer.send(new ProducerRecord<String, String>(topic, "", msg));
                }
                producer.close();
            } catch (Exception e) {
                System.out.println("Exception caught in main method while trying to produce the messages to Kafka");
                e.printStackTrace();
            }
    }
}

第二个代码段中调用了一个关键方法,但第一个代码段中缺少该方法

producer.close();

来自该方法的文档:

Close this producer. This method blocks until all previously sent requests complete.

当您调用方法 produce 时,实际上并不意味着已生成消息。方法returns你future。您可以通过对 produce 方法的每个结果调用 get() 来等待生成每条消息。