为什么当我启动 spark stream 时,kafka 消费者代码会冻结?
Why does the kafka consumer code freeze when I start spark stream?
我是 Kafka 的新手,正在尝试在 spark2 中实现 Kafka 消费者逻辑,当我 运行 我在 shell 中的所有代码并开始流式传输时,它什么也没显示。
我在 Whosebug 中查看了很多帖子,但没有任何帮助。我什至已经从 maven 下载了所有依赖 jar 并尝试 运行 但它仍然没有显示任何内容。
星火版本:2.2.0
斯卡拉版本 2.11.8
我下载的罐子是 kafka-clients-2.2.0.jar 和 spark-streaming-kafka-0-10_2.11-2.2.0.jar
但我仍然面临同样的问题。
请找到下面的代码片段
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kafka010.{KafkaUtils, ConsumerStrategies, LocationStrategies}
val brokers = "host1:port, host2:port"
val groupid = "default"
val topics = "kafka_sample"
val topicset = topics.split(",").toSet
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
val msg = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicset, kafkaParams)
)
msg.foreachRDD{
rdd => rdd.collect().foreach(println)
}
ssc.start()
我期待 SparkStreaming 启动,但它没有执行任何操作。我在这里犯了什么错误?或者这是一个已知问题?
driver 将闲置,除非您在最后调用 ssc.awaitTermination()
。如果您使用的是 spark-shell 那么它不是流式传输作业的好工具。
请使用 Zeppelin 或 Spark notebook 等交互式工具与流媒体进行交互,或者尝试将您的应用构建为 jar 文件,然后进行部署。
此外,如果您正在尝试 Spark Streaming,Structured Streaming 会更好,因为它很容易玩。
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
- 在
ssc.start()
之后,在您的代码中使用 ssc.awaitTermination()
。
- 为了测试,将您的代码写入 Main Object 并 运行 将其写入任何 IDE 中,例如 Intellij
- 您可以使用命令 shell 并从 Kafka 生产者发布消息。
我在博客 post 中的一个简单示例中编写了所有这些步骤,工作代码在 GitHub 中。请参考:http://softwaredevelopercentral.blogspot.com/2018/10/spark-streaming-and-kafka-integration.html
我是 Kafka 的新手,正在尝试在 spark2 中实现 Kafka 消费者逻辑,当我 运行 我在 shell 中的所有代码并开始流式传输时,它什么也没显示。
我在 Whosebug 中查看了很多帖子,但没有任何帮助。我什至已经从 maven 下载了所有依赖 jar 并尝试 运行 但它仍然没有显示任何内容。
星火版本:2.2.0 斯卡拉版本 2.11.8 我下载的罐子是 kafka-clients-2.2.0.jar 和 spark-streaming-kafka-0-10_2.11-2.2.0.jar
但我仍然面临同样的问题。
请找到下面的代码片段
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kafka010.{KafkaUtils, ConsumerStrategies, LocationStrategies}
val brokers = "host1:port, host2:port"
val groupid = "default"
val topics = "kafka_sample"
val topicset = topics.split(",").toSet
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
val msg = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicset, kafkaParams)
)
msg.foreachRDD{
rdd => rdd.collect().foreach(println)
}
ssc.start()
我期待 SparkStreaming 启动,但它没有执行任何操作。我在这里犯了什么错误?或者这是一个已知问题?
driver 将闲置,除非您在最后调用 ssc.awaitTermination()
。如果您使用的是 spark-shell 那么它不是流式传输作业的好工具。
请使用 Zeppelin 或 Spark notebook 等交互式工具与流媒体进行交互,或者尝试将您的应用构建为 jar 文件,然后进行部署。
此外,如果您正在尝试 Spark Streaming,Structured Streaming 会更好,因为它很容易玩。
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
- 在
ssc.start()
之后,在您的代码中使用ssc.awaitTermination()
。 - 为了测试,将您的代码写入 Main Object 并 运行 将其写入任何 IDE 中,例如 Intellij
- 您可以使用命令 shell 并从 Kafka 生产者发布消息。
我在博客 post 中的一个简单示例中编写了所有这些步骤,工作代码在 GitHub 中。请参考:http://softwaredevelopercentral.blogspot.com/2018/10/spark-streaming-and-kafka-integration.html