将来自 kafkaStream 的字符串存储到一个变量中进行处理

storing the string from kafkaStream into a variable for processing

我需要从 Kafka 生产者那里获取消息,我需要从消息中找到包含 % 的单词并为不同的 % 值生成一条消息。最后我需要将它发送到 ElasticSearch。

我可以使用 kafkaStream.print() 在控制台中查看值,但我需要处理字符串以匹配所需的关键字并生成消息。

我的代码:

package rnd

import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object WordFind {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("KafkaReceiver")
    val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"

    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds

    val batchIntervalSeconds = 2
    val ssc = new StreamingContext(conf, Seconds(10))

    import org.apache.spark.streaming.kafka.KafkaUtils

    val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))

    val s = kafkaStream.print()
    println(" the words are: " + s)
    ssc.remember(Minutes(1))
    ssc.checkpoint(checkpointDir)
    ssc
    ssc.start()
    ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
  }
}

如果我通过 Lafka 生产者传递 "The usage is 75%",我应该在 ElasticSearch 中生成一条消息说 "Increase ram by 25%"。

我得到的输出是:

18/02/09 16:38:27 INFO BlockManagerMasterEndpoint: Registering block manager localhost:37879 with 2.4 GB RAM, BlockManagerId(driver, localhost, 37879)
18/02/09 16:38:27 INFO BlockManagerMaster: Registered BlockManager
18/02/09 16:38:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
 ***the words are: ()***

我想要传递的字符串代替 's' 中的 ()。

val kafkaStream 是一个 RecieverInputDStream[(String, String)],其中数据是 (kafkaMetaData, kafkaMessage) 有关详细信息,请参阅 [https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala#L135]。

我们需要提取元组的第二个并进行模式匹配(即过滤器 RecieverInputDStream 找到包含 % 的单词),然后使用 map 生成输出(即不同 % 值的消息)。正如@stefanobaghino 所提到的,print() 函数只是将输出打印到控制台,而不是 return 任何记录字符串。

例如:

import org.apache.spark.streaming.dstream.ReceiverInputDStream
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(sparkStreamingContext, "localhost:2181",
  "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))

import org.apache.spark.streaming.dstream.DStream
val filteredStream: DStream[(String, String)] = kafkaStream
  .filter(record => record._2.contains("%")) // TODO : pattern matching here

val outputDStream: DStream[String] = filteredStream
  .map(record => record._2.toUpperCase()) // just assuming some operation
outputDStream.print()

使用outputDStream写入ElasticSearch。希望这有帮助。