Spark Streaming Kafka 流
Spark Streaming Kafka stream
我在尝试使用 Spark 流从 kafka 读取数据时遇到了一些问题。
我的代码是:
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "consumergroup",
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connection.timeout.ms" -> "10000"
//"kafka.auto.offset.reset" -> "smallest"
)
val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
我之前在 2181 端口启动了 zookeeper,在 9092 端口启动了 Kafka 服务器 0.9.0.0。
但是我在 Spark 驱动程序中收到以下错误:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun$$anonfun$apply$$anonfun$apply.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun$$anonfun$apply.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun$$anonfun$apply.apply(KafkaCluster.scala:87)
Zookeeper 日志:
[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
有什么提示吗?
非常感谢
问题与错误的 spark-streaming-kafka 版本有关。
中所述
Kafka: Spark Streaming 1.5.2 is compatible with Kafka 0.8.2.1
所以,包括
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
在我的 pom.xml(而不是版本 0.9.0.0)中解决了这个问题。
希望对您有所帮助
Kafka10 流/Spark 2.1.0/DCOS/Mesosphere
Ugg 我花了一整天的时间,一定已经读了 post 十几遍了。我试过spark 2.0.0、2.0.1、Kafka 8、Kafka 10。远离Kafka 8和spark 2.0.x,依赖就是一切。从下面开始。有效。
SBT:
"org.apache.hadoop" % "hadoop-aws" % "2.7.3" excludeAll ExclusionRule(organization = "org.apache.hadoop", name = "hadoop-common"),
"org.apache.spark" %% "spark-core" % "2.1.0",
"org.apache.spark" %% "spark-sql" % "2.1.0" ,
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0",
"org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
工作Kafka/Spark 流代码:
val spark = SparkSession
.builder()
.appName("ingest")
.master("local[4]")
.getOrCreate()
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
val topics = Set("water2").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "broker:port,broker:port",
"bootstrap.servers" -> "broker:port,broker:port",
"group.id" -> "somegroup",
"auto.commit.interval.ms" -> "1000",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "true"
)
val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
messages.foreachRDD(rdd => {
if (rdd.count() >= 1) {
rdd.map(record => (record.key, record.value))
.toDS()
.withColumnRenamed("_2", "value")
.drop("_1")
.show(5, false)
println(rdd.getClass)
}
})
ssc.start()
ssc.awaitTermination()
如果你看到这个,请点赞,这样我可以获得一些声望点。 :)
我在尝试使用 Spark 流从 kafka 读取数据时遇到了一些问题。
我的代码是:
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "consumergroup",
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connection.timeout.ms" -> "10000"
//"kafka.auto.offset.reset" -> "smallest"
)
val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
我之前在 2181 端口启动了 zookeeper,在 9092 端口启动了 Kafka 服务器 0.9.0.0。 但是我在 Spark 驱动程序中收到以下错误:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun$$anonfun$apply$$anonfun$apply.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun$$anonfun$apply.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun$$anonfun$apply.apply(KafkaCluster.scala:87)
Zookeeper 日志:
[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
有什么提示吗?
非常感谢
问题与错误的 spark-streaming-kafka 版本有关。
中所述Kafka: Spark Streaming 1.5.2 is compatible with Kafka 0.8.2.1
所以,包括
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
在我的 pom.xml(而不是版本 0.9.0.0)中解决了这个问题。
希望对您有所帮助
Kafka10 流/Spark 2.1.0/DCOS/Mesosphere
Ugg 我花了一整天的时间,一定已经读了 post 十几遍了。我试过spark 2.0.0、2.0.1、Kafka 8、Kafka 10。远离Kafka 8和spark 2.0.x,依赖就是一切。从下面开始。有效。
SBT:
"org.apache.hadoop" % "hadoop-aws" % "2.7.3" excludeAll ExclusionRule(organization = "org.apache.hadoop", name = "hadoop-common"),
"org.apache.spark" %% "spark-core" % "2.1.0",
"org.apache.spark" %% "spark-sql" % "2.1.0" ,
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0",
"org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
工作Kafka/Spark 流代码:
val spark = SparkSession
.builder()
.appName("ingest")
.master("local[4]")
.getOrCreate()
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
val topics = Set("water2").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "broker:port,broker:port",
"bootstrap.servers" -> "broker:port,broker:port",
"group.id" -> "somegroup",
"auto.commit.interval.ms" -> "1000",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "true"
)
val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
messages.foreachRDD(rdd => {
if (rdd.count() >= 1) {
rdd.map(record => (record.key, record.value))
.toDS()
.withColumnRenamed("_2", "value")
.drop("_1")
.show(5, false)
println(rdd.getClass)
}
})
ssc.start()
ssc.awaitTermination()
如果你看到这个,请点赞,这样我可以获得一些声望点。 :)