Kafka Streams API 使用来自流配置的不同消费者 bootstrap 服务器
Kafka Streams API using different consumer bootstrap server from streams configuration
我正在学习 O'Reiley 的 KafkaStreams 课程,但在 Java 中遇到问题 运行 KafkaStreams v2.0.0;我的代码和属性配置如下:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "otherhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
// 1 - stream from Kafka
KStream<String, String> textLines = builder.stream("word-count-input");
KTable<String, Long> wordCounts = textLines
// 2 - map values to lowercase
.mapValues(textLine -> textLine.toLowerCase())
// can be alternatively written as:
// .mapValues(String::toLowerCase)
// 3 - flatmap values split by space
.flatMapValues(textLine -> Arrays.asList(textLine.split("\W+")))
// 4 - select key to apply a key (we discard the old key)
.selectKey((key, word) -> word)
// 5 - group by key before aggregation
.groupByKey()
// 6 - count occurrences
.count("Counts");
// 7 - to in order to write the results back to kafka
wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
KafkaAdminClient 能够连接到 "otherhost:9092" 并且应用程序运行没有问题,但奇怪的是我收到有关消费者客户端尝试连接到不存在的 kafka localhost 实例而不是 [=23= 的错误]:
WARN [Consumer clientId=xxxxxxxxx-678dee93-a403-4635-9cfb-ccde35489acc-StreamThread-1-consumer, groupId=xxxxxxxxxx] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:748)
请注意,消费者正在尝试连接到 "localhost/127.0.0.1:9092" 而不是 otherhost:9092,为什么?因此,我的流媒体应用程序将无法运行。我很困惑,因为 Kafka Streams 文档 (https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#bootstrap-servers) 明确指出:"The Kafka bootstrap servers. This is the same setting that is used by the underlying producer and consumer clients to connect to the Kafka cluster. Example: "kafka-broker1:9092,kafka-broker2:9092"。"
我做错了什么?谢谢
发送给客户的地址 return 由代理上的 advertised.listeners
地址设置。
您的代码中给出的地址只是初始 bootstrap 连接。
您需要编辑代理设置以确保您 return 外部可解析的 "advertised" 地址,然后重新启动它
我正在学习 O'Reiley 的 KafkaStreams 课程,但在 Java 中遇到问题 运行 KafkaStreams v2.0.0;我的代码和属性配置如下:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "otherhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
// 1 - stream from Kafka
KStream<String, String> textLines = builder.stream("word-count-input");
KTable<String, Long> wordCounts = textLines
// 2 - map values to lowercase
.mapValues(textLine -> textLine.toLowerCase())
// can be alternatively written as:
// .mapValues(String::toLowerCase)
// 3 - flatmap values split by space
.flatMapValues(textLine -> Arrays.asList(textLine.split("\W+")))
// 4 - select key to apply a key (we discard the old key)
.selectKey((key, word) -> word)
// 5 - group by key before aggregation
.groupByKey()
// 6 - count occurrences
.count("Counts");
// 7 - to in order to write the results back to kafka
wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
KafkaAdminClient 能够连接到 "otherhost:9092" 并且应用程序运行没有问题,但奇怪的是我收到有关消费者客户端尝试连接到不存在的 kafka localhost 实例而不是 [=23= 的错误]:
WARN [Consumer clientId=xxxxxxxxx-678dee93-a403-4635-9cfb-ccde35489acc-StreamThread-1-consumer, groupId=xxxxxxxxxx] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:748)
请注意,消费者正在尝试连接到 "localhost/127.0.0.1:9092" 而不是 otherhost:9092,为什么?因此,我的流媒体应用程序将无法运行。我很困惑,因为 Kafka Streams 文档 (https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#bootstrap-servers) 明确指出:"The Kafka bootstrap servers. This is the same setting that is used by the underlying producer and consumer clients to connect to the Kafka cluster. Example: "kafka-broker1:9092,kafka-broker2:9092"。"
我做错了什么?谢谢
发送给客户的地址 return 由代理上的 advertised.listeners
地址设置。
您的代码中给出的地址只是初始 bootstrap 连接。
您需要编辑代理设置以确保您 return 外部可解析的 "advertised" 地址,然后重新启动它