将来自 kafkaStream 的字符串存储到一个变量中进行处理
storing the string from kafkaStream into a variable for processing
我需要从 Kafka 生产者那里获取消息,我需要从消息中找到包含 %
的单词并为不同的 %
值生成一条消息。最后我需要将它发送到 ElasticSearch。
我可以使用 kafkaStream.print()
在控制台中查看值,但我需要处理字符串以匹配所需的关键字并生成消息。
我的代码:
package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("KafkaReceiver")
val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val batchIntervalSeconds = 2
val ssc = new StreamingContext(conf, Seconds(10))
import org.apache.spark.streaming.kafka.KafkaUtils
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
val s = kafkaStream.print()
println(" the words are: " + s)
ssc.remember(Minutes(1))
ssc.checkpoint(checkpointDir)
ssc
ssc.start()
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}
}
如果我通过 Lafka 生产者传递 "The usage is 75%",我应该在 ElasticSearch 中生成一条消息说 "Increase ram by 25%"。
我得到的输出是:
18/02/09 16:38:27 INFO BlockManagerMasterEndpoint: Registering block manager localhost:37879 with 2.4 GB RAM, BlockManagerId(driver, localhost, 37879)
18/02/09 16:38:27 INFO BlockManagerMaster: Registered BlockManager
18/02/09 16:38:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
***the words are: ()***
我想要传递的字符串代替 's' 中的 ()。
val kafkaStream
是一个 RecieverInputDStream[(String, String)]
,其中数据是 (kafkaMetaData, kafkaMessage)
有关详细信息,请参阅 [https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala#L135]。
我们需要提取元组的第二个并进行模式匹配(即过滤器 RecieverInputDStream 找到包含 % 的单词),然后使用 map 生成输出(即不同 % 值的消息)。正如@stefanobaghino 所提到的,print() 函数只是将输出打印到控制台,而不是 return 任何记录字符串。
例如:
import org.apache.spark.streaming.dstream.ReceiverInputDStream
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(sparkStreamingContext, "localhost:2181",
"spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
import org.apache.spark.streaming.dstream.DStream
val filteredStream: DStream[(String, String)] = kafkaStream
.filter(record => record._2.contains("%")) // TODO : pattern matching here
val outputDStream: DStream[String] = filteredStream
.map(record => record._2.toUpperCase()) // just assuming some operation
outputDStream.print()
使用outputDStream写入ElasticSearch。希望这有帮助。
我需要从 Kafka 生产者那里获取消息,我需要从消息中找到包含 %
的单词并为不同的 %
值生成一条消息。最后我需要将它发送到 ElasticSearch。
我可以使用 kafkaStream.print()
在控制台中查看值,但我需要处理字符串以匹配所需的关键字并生成消息。
我的代码:
package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("KafkaReceiver")
val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val batchIntervalSeconds = 2
val ssc = new StreamingContext(conf, Seconds(10))
import org.apache.spark.streaming.kafka.KafkaUtils
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
val s = kafkaStream.print()
println(" the words are: " + s)
ssc.remember(Minutes(1))
ssc.checkpoint(checkpointDir)
ssc
ssc.start()
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}
}
如果我通过 Lafka 生产者传递 "The usage is 75%",我应该在 ElasticSearch 中生成一条消息说 "Increase ram by 25%"。
我得到的输出是:
18/02/09 16:38:27 INFO BlockManagerMasterEndpoint: Registering block manager localhost:37879 with 2.4 GB RAM, BlockManagerId(driver, localhost, 37879)
18/02/09 16:38:27 INFO BlockManagerMaster: Registered BlockManager
18/02/09 16:38:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
***the words are: ()***
我想要传递的字符串代替 's' 中的 ()。
val kafkaStream
是一个 RecieverInputDStream[(String, String)]
,其中数据是 (kafkaMetaData, kafkaMessage)
有关详细信息,请参阅 [https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala#L135]。
我们需要提取元组的第二个并进行模式匹配(即过滤器 RecieverInputDStream 找到包含 % 的单词),然后使用 map 生成输出(即不同 % 值的消息)。正如@stefanobaghino 所提到的,print() 函数只是将输出打印到控制台,而不是 return 任何记录字符串。
例如:
import org.apache.spark.streaming.dstream.ReceiverInputDStream
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(sparkStreamingContext, "localhost:2181",
"spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
import org.apache.spark.streaming.dstream.DStream
val filteredStream: DStream[(String, String)] = kafkaStream
.filter(record => record._2.contains("%")) // TODO : pattern matching here
val outputDStream: DStream[String] = filteredStream
.map(record => record._2.toUpperCase()) // just assuming some operation
outputDStream.print()
使用outputDStream写入ElasticSearch。希望这有帮助。