Spark streaming:错误 StreamingContext:构建 kafka 消费者失败
Spark streaming: ERROR StreamingContext: failed to construct kafka consumer
我正在尝试使用 spark streaming 访问 kafka 主题。我不认为我遗漏了任何依赖项或导入项,但是当我尝试 运行 我的代码时,如下所示:
public static void main(String[] args) {
String URL = "spark://localhost:7077";
SparkConf conf = new SparkConf().setAppName("Kafka-test").setMaster(URL);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:6667");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "ID1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("MAX_LEGO", "CanBeDeleted");
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaPairDStream<Object, Object> max = stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
max.count();
max.print();
ssc.start();
}
我收到一条错误消息:
18/02/10 16:57:08 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:536)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
at org.kafkaConnection2.main(kafkaConnection2.java:50)
Caused by: org.apache.kafka.common.KafkaException: com.fasterxml.jackson.databind.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:536)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
我的假设是,可能引导服务器设置错误或连接到 kafka 时出现其他问题...
请不要被这个可能相当愚蠢的问题所困扰,但我刚刚开始使用 Spark 和 kafka
尝试使用 org.apache.kafka.common.serialization.StringDeserializer 而不是 com.fasterxml.jackson.databind.deser.std.StringDeserializer,因为您会遇到以下异常
原因:org.apache.kafka.common.KafkaException:com.fasterxml.jackson.databind.deser.std.StringDeserializer 不是 org.apache.kafka.common.serialization.Deserializer 的实例
在 org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:624)
在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:553)
在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:536)
在 org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
在
我正在尝试使用 spark streaming 访问 kafka 主题。我不认为我遗漏了任何依赖项或导入项,但是当我尝试 运行 我的代码时,如下所示:
public static void main(String[] args) {
String URL = "spark://localhost:7077";
SparkConf conf = new SparkConf().setAppName("Kafka-test").setMaster(URL);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:6667");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "ID1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("MAX_LEGO", "CanBeDeleted");
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaPairDStream<Object, Object> max = stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
max.count();
max.print();
ssc.start();
}
我收到一条错误消息:
18/02/10 16:57:08 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:536)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
at org.kafkaConnection2.main(kafkaConnection2.java:50)
Caused by: org.apache.kafka.common.KafkaException: com.fasterxml.jackson.databind.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:536)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
我的假设是,可能引导服务器设置错误或连接到 kafka 时出现其他问题...
请不要被这个可能相当愚蠢的问题所困扰,但我刚刚开始使用 Spark 和 kafka
尝试使用 org.apache.kafka.common.serialization.StringDeserializer 而不是 com.fasterxml.jackson.databind.deser.std.StringDeserializer,因为您会遇到以下异常
原因:org.apache.kafka.common.KafkaException:com.fasterxml.jackson.databind.deser.std.StringDeserializer 不是 org.apache.kafka.common.serialization.Deserializer 的实例 在 org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:624) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:553) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:536) 在 org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) 在