rdd 动作将在 DStream foreachRDD 函数中暂停
rdd action will be suspended in DStream foreachRDD function
我遇到错误:rdd 操作将在 DStream foreachRDD 函数中暂停。
请参考以下代码
import _root_.kafka.common.TopicAndPartition
import _root_.kafka.message.MessageAndMetadata
import _root_.kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object StreamingTest {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topicOffset = Map(TopicAndPartition("test_log",0)->200000L)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd.message
val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,String](ssc,kafkaParams,topicOffset,messageHandler)
kafkaStream.foreachRDD(rdd=>{
println(rdd.count())
val collected = rdd.collect()
})
ssc.start()
ssc.awaitTermination()
}
}
错误:
函数rdd.count()
或rdd.collect()
将被暂停。
我使用的spark
版本是1.4.1.
我是不是用错了?
提前致谢。
如果我们没有设置 kafka 的 maxRatePerPartition,它会尝试读取所有数据,所以它看起来像是挂起的。但它实际上忙于读取数据。
我设置如下配置后
spark.streaming.kafka.maxRatePerPartition=1000
它将打印日志。
我遇到错误:rdd 操作将在 DStream foreachRDD 函数中暂停。
请参考以下代码
import _root_.kafka.common.TopicAndPartition
import _root_.kafka.message.MessageAndMetadata
import _root_.kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object StreamingTest {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topicOffset = Map(TopicAndPartition("test_log",0)->200000L)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd.message
val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,String](ssc,kafkaParams,topicOffset,messageHandler)
kafkaStream.foreachRDD(rdd=>{
println(rdd.count())
val collected = rdd.collect()
})
ssc.start()
ssc.awaitTermination()
}
}
错误:
函数rdd.count()
或rdd.collect()
将被暂停。
我使用的spark
版本是1.4.1.
我是不是用错了?
提前致谢。
如果我们没有设置 kafka 的 maxRatePerPartition,它会尝试读取所有数据,所以它看起来像是挂起的。但它实际上忙于读取数据。
我设置如下配置后
spark.streaming.kafka.maxRatePerPartition=1000
它将打印日志。