如何将这个简单的 Spark Streaming 代码变成多线程代码?
How to turn this simple Spark Streaming code into a Multi threaded one?
我正在用 Scala 学习 Kafka。附带的代码只是使用 Kafka 和 Spark Streaming 的字数统计实现。
我如何在流式传输时让每个分区有单独的消费者执行?请帮忙!
这是我的代码:
class ConsumerM(topics: String, bootstrap_server: String, group_name: String) {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
.setMaster("local[*]")
.set("spark.executor.memory","1g")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topicsSet = topics.split(",")
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap_server,
ConsumerConfig.GROUP_ID_CONFIG -> group_name,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
"auto.offset.reset" ->"earliest")
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
假设您的输入主题有多个分区,那么另外设置 local[*]
意味着每个 CPU 核心将有一个 Spark 执行器,并且每个 [=11] 至少可以使用一个分区=]
我正在用 Scala 学习 Kafka。附带的代码只是使用 Kafka 和 Spark Streaming 的字数统计实现。 我如何在流式传输时让每个分区有单独的消费者执行?请帮忙!
这是我的代码:
class ConsumerM(topics: String, bootstrap_server: String, group_name: String) {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
.setMaster("local[*]")
.set("spark.executor.memory","1g")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topicsSet = topics.split(",")
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap_server,
ConsumerConfig.GROUP_ID_CONFIG -> group_name,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
"auto.offset.reset" ->"earliest")
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
假设您的输入主题有多个分区,那么另外设置 local[*]
意味着每个 CPU 核心将有一个 Spark 执行器,并且每个 [=11] 至少可以使用一个分区=]