服务器不接受 Kafka 消息(仅来自远程 IP?)
Kafka messages not accepted by server (only from remote IP?)
我的 Kafka 服务器工作正常,如果我从 C++ 应用程序或从本地计算机的命令行生成和使用。
但它不能很好地从外部 IP 地址运行:主题被创建,我看到网络流量使用 tcpdump 和 ACK(意味着 kafka 正在回答),但我发现队列中没有消息,并且 java 没有给出错误。在日志中找不到任何内容。或者 google。这是我的应用程序:
public class KafkaPBJProducer {
private static String KAFKA_TOPIC = "test";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.131:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("kafka.topic", KAFKA_TOPIC);
runMainLoop(properties);
}
static void runMainLoop(Properties properties) {
@SuppressWarnings("resource")
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int count = 0; count < 10; count++) {
String msg = getMsg(String.valueOf(count));
System.out.println("Producing message: " + msg);
producer.send(new ProducerRecord<String, String>(KAFKA_TOPIC, 0, "dev-" + count, msg));
producer.flush();
}
}
public static String getMsg(String id) {
JsonObject obj = new JsonObject();
try {
obj.addProperty("id", id);
obj.addProperty("timestamp", new Timestamp(System.currentTimeMillis()).toString());
obj.addProperty("data", Base64.getEncoder().encodeToString("Hello, World!".getBytes("utf-8")));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new Gson().toJson(obj);
}
}
当它运行时,它会在每条消息上卡住一分钟(没有显示错误,只是需要很长时间才能发送):
Producing message: {"id":"0","timestamp":"2018-08-21 13:49:10.697","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
Producing message: {"id":"1","timestamp":"2018-08-21 13:50:10.794","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
Producing message: {"id":"2","timestamp":"2018-08-21 13:51:10.797","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
Producing message: {"id":"3","timestamp":"2018-08-21 13:52:10.813","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
我可以看到主题已创建:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test
我可以使用命令行手动向主题添加内容:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>one
>two
>four
但是如果我查看主题,只列出手动发送的消息:
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.131:9092 --topic test --from-beginning
one
two
four
(after several minutes, I press CTRL-C)
^CProcessed a total of 3 messages
这里发生了什么?为什么创建了主题但服务器上不接受任何消息?为什么kafka不报错?
TIA!
找到。基于,问题出在配置文件conf/servers.properties;它需要取消注释这一行:
advertised.listeners=PLAINTEXT://192.168.1.131:9092
谢谢。
我的 Kafka 服务器工作正常,如果我从 C++ 应用程序或从本地计算机的命令行生成和使用。
但它不能很好地从外部 IP 地址运行:主题被创建,我看到网络流量使用 tcpdump 和 ACK(意味着 kafka 正在回答),但我发现队列中没有消息,并且 java 没有给出错误。在日志中找不到任何内容。或者 google。这是我的应用程序:
public class KafkaPBJProducer {
private static String KAFKA_TOPIC = "test";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.131:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("kafka.topic", KAFKA_TOPIC);
runMainLoop(properties);
}
static void runMainLoop(Properties properties) {
@SuppressWarnings("resource")
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int count = 0; count < 10; count++) {
String msg = getMsg(String.valueOf(count));
System.out.println("Producing message: " + msg);
producer.send(new ProducerRecord<String, String>(KAFKA_TOPIC, 0, "dev-" + count, msg));
producer.flush();
}
}
public static String getMsg(String id) {
JsonObject obj = new JsonObject();
try {
obj.addProperty("id", id);
obj.addProperty("timestamp", new Timestamp(System.currentTimeMillis()).toString());
obj.addProperty("data", Base64.getEncoder().encodeToString("Hello, World!".getBytes("utf-8")));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new Gson().toJson(obj);
}
}
当它运行时,它会在每条消息上卡住一分钟(没有显示错误,只是需要很长时间才能发送):
Producing message: {"id":"0","timestamp":"2018-08-21 13:49:10.697","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
Producing message: {"id":"1","timestamp":"2018-08-21 13:50:10.794","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
Producing message: {"id":"2","timestamp":"2018-08-21 13:51:10.797","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
Producing message: {"id":"3","timestamp":"2018-08-21 13:52:10.813","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
我可以看到主题已创建:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test
我可以使用命令行手动向主题添加内容:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>one
>two
>four
但是如果我查看主题,只列出手动发送的消息:
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.131:9092 --topic test --from-beginning
one
two
four
(after several minutes, I press CTRL-C)
^CProcessed a total of 3 messages
这里发生了什么?为什么创建了主题但服务器上不接受任何消息?为什么kafka不报错?
TIA!
找到。基于
advertised.listeners=PLAINTEXT://192.168.1.131:9092
谢谢。