从命令提示符启动 Kafka 使用者时,我收到 Class Not Found Exception。我该如何解决这个问题?
I'm getting Class Not Found Exception on launching Kafka consumer from command prompt. How do I resolve this?
我正在使用 Kafka-streams 执行字数统计问题。 Kafka 使用的版本是 2.3.1,创建了两个主题 word-count-input 和 word-count-output。相同的代码是
public static void main( String[] args )
{
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("word-count-input");
KTable<String, Long> wordCounts = textLines.mapValues(value -> value.toLowerCase())
.flatMapValues(value -> Arrays.asList(value.split(" ")))
.selectKey((key,value) -> value)
.groupByKey()
.count();
// Writing back to the kafka topic
wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
//printing the topology
System.out.println(streams.toString());
// shutdown hook to correctly close the stream application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
在使用命令提示符为主题字数统计输出启动消费者时,它会抛出 Class 未找到设置 key.deserializer 和 value.deserializer 属性的异常。
你可以看看命令提示符here。
问题出在Kafka版本不同。系统安装的版本是2.3.1,maven中安装的是2.1.1。希望对大家有帮助。
我正在使用 Kafka-streams 执行字数统计问题。 Kafka 使用的版本是 2.3.1,创建了两个主题 word-count-input 和 word-count-output。相同的代码是
public static void main( String[] args )
{
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("word-count-input");
KTable<String, Long> wordCounts = textLines.mapValues(value -> value.toLowerCase())
.flatMapValues(value -> Arrays.asList(value.split(" ")))
.selectKey((key,value) -> value)
.groupByKey()
.count();
// Writing back to the kafka topic
wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
//printing the topology
System.out.println(streams.toString());
// shutdown hook to correctly close the stream application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
在使用命令提示符为主题字数统计输出启动消费者时,它会抛出 Class 未找到设置 key.deserializer 和 value.deserializer 属性的异常。
你可以看看命令提示符here。
问题出在Kafka版本不同。系统安装的版本是2.3.1,maven中安装的是2.1.1。希望对大家有帮助。