如何将这个简单的 Spark Streaming 代码变成多线程代码?

How to turn this simple Spark Streaming code into a Multi threaded one?

我正在用 Scala 学习 Kafka。附带的代码只是使用 Kafka 和 Spark Streaming 的字数统计实现。 我如何在流式传输时让每个分区有单独的消费者执行?请帮忙!

这是我的代码:



class ConsumerM(topics: String, bootstrap_server: String, group_name: String) {

  Logger.getLogger("org").setLevel(Level.ERROR)

  val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    .setMaster("local[*]")
    .set("spark.executor.memory","1g")

  val ssc = new StreamingContext(sparkConf, Seconds(1))

  val topicsSet = topics.split(",")

  val kafkaParams = Map[String, Object](
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap_server,
    ConsumerConfig.GROUP_ID_CONFIG -> group_name,
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    "auto.offset.reset" ->"earliest")


  val messages = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))


  val lines = messages.map(_.value)
  val words = lines.flatMap(_.split(" "))
  val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  wordCounts.print()

  ssc.start()
  ssc.awaitTermination()
}

假设您的输入主题有多个分区,那么另外设置 local[*] 意味着每个 CPU 核心将有一个 Spark 执行器,并且每个 [=11] 至少可以使用一个分区=]