无法从 Java 连接到 Docker 中的 Kafka 运行

Unable to Connect from Java to Kafka running in Docker

尝试使用 Debezium 将 MySql 数据库流式传输到 Kafka。

因此,在 Docker 容器中,我启动了 Zookeeper、Kafka、MySQL 数据库、MySQL 命令行和 Kafka Connect。

当我 运行 在 MySQL 命令行中执行任何 DML 命令时,我可以在 docker 中启动的观察器 window 中看到更改事件].所以目前一切看起来都很好。请在下面找到相同的内容。

现在,我正在尝试使用 Java 代码中的更改事件,每当我在 MySQL 命令行中执行任何 DML 命令时,这些代码都可以在观察程序 window 中看到.请在下面找到消费者。

            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            Consumer<String, String> consumer = new KafkaConsumer<>(properties);
            ArrayList<String> topics = new ArrayList<>();
            topics.add("dbserver1.inventory.customers");
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1L);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Message received: " + record.value());
                }
                consumer.commitAsync();
            }

无法使用来自上述消费者的数据更改事件。如果有什么需要做的,请告诉我。

我通过如下更改 BOOTSTRAP_SERVERS_CONFIG 的 属性 解决了这个问题,现在工作正常。能够消费数据变化。

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

并使用数据更改

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(1000L);
  for (ConsumerRecord<String, String> record : records) {
    String payload = record.value();
    if(payload.contains("payload")) {
    JsonObject root = (JsonObject) new JsonParser().parse(payload);
    root = root.getAsJsonObject("payload").getAsJsonObject("after");
   }
  }
  consumer.commitAsync();
}

它对我有用。 :)

有效与无效的区别在于您访问的侦听器端口。

所以要么:

  • 29092 错了,9092 正确
  • 29092 绑定到 Docker 网络内部的广告侦听器,9092 绑定到 localhost (learn more)