Kafka Streams API 使用来自流配置的不同消费者 bootstrap 服务器

Kafka Streams API using different consumer bootstrap server from streams configuration

我正在学习 O'Reiley 的 KafkaStreams 课程,但在 Java 中遇到问题 运行 KafkaStreams v2.0.0;我的代码和属性配置如下:

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "otherhost:9092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    // 1 - stream from Kafka

    KStream<String, String> textLines = builder.stream("word-count-input");
    KTable<String, Long> wordCounts = textLines
            // 2 - map values to lowercase
            .mapValues(textLine -> textLine.toLowerCase())
            // can be alternatively written as:
            // .mapValues(String::toLowerCase)
            // 3 - flatmap values split by space
            .flatMapValues(textLine -> Arrays.asList(textLine.split("\W+")))
            // 4 - select key to apply a key (we discard the old key)
            .selectKey((key, word) -> word)
            // 5 - group by key before aggregation
            .groupByKey()
            // 6 - count occurrences
            .count("Counts");

    // 7 - to in order to write the results back to kafka
    wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

KafkaAdminClient 能够连接到 "otherhost:9092" 并且应用程序运行没有问题,但奇怪的是我收到有关消费者客户端尝试连接到不存在的 kafka localhost 实例而不是 [=23= 的错误]:

WARN [Consumer clientId=xxxxxxxxx-678dee93-a403-4635-9cfb-ccde35489acc-StreamThread-1-consumer, groupId=xxxxxxxxxx] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:748) 

请注意,消费者正在尝试连接到 "localhost/127.0.0.1:9092" 而不是 otherhost:9092,为什么?因此,我的流媒体应用程序将无法运行。我很困惑,因为 Kafka Streams 文档 (https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#bootstrap-servers) 明确指出:"The Kafka bootstrap servers. This is the same setting that is used by the underlying producer and consumer clients to connect to the Kafka cluster. Example: "kafka-broker1:9092,kafka-broker2:9092"。"

我做错了什么?谢谢

发送给客户的地址 return 由代理上的 advertised.listeners 地址设置。

您的代码中给出的地址只是初始 bootstrap 连接。

您需要编辑代理设置以确保您 return 外部可解析的 "advertised" 地址,然后重新启动它