Kafka 上的 Spark Streaming 为来自 kafka 的不同值打印不同的案例

Spark Streaming on Kafka print different cases for different values from kafka

我在下面陈述我的情况: 10000 - 服务器正在发送 DF 大小的数据。 (每 5 秒就有 10,000 个输入)

如果任何服务器的 DF 大小超过 70%,请将 ROM 大小增加 20% 如果对于任何服务器,使用的 DF 大小小于 30%,则打印将 ROM 大小减少 25%。

我提供了一个代码,它从 kafka 获取消息并与“%”匹配并执行 to.upper()。这段代码只是为了参考我的kafka细节。

谁能帮我解决这个问题。

package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}


object WordFind {
    def main(args: Array[String]) {
        import org.apache.spark.SparkConf
        val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
        import org.apache.spark.streaming.StreamingContext
        import org.apache.spark.streaming.Seconds
        val batchIntervalSeconds = 2

        val ssc = new StreamingContext(conf, Seconds(10))
        import org.apache.spark.streaming.kafka.KafkaUtils
        import org.apache.spark.streaming.dstream.ReceiverInputDStream

        val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
        import org.apache.spark.streaming.dstream.DStream

        val filteredStream: DStream[(String, String)] = kafkaStream.filter(record =>
                record._2.contains("%")) // TODO : pattern matching here
        val outputDStream: DStream[String] = filteredStream.map(record => record._2.toUpperCase())

        outputDStream.print()

        ssc.start
        ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
    }
}

请帮我写出场景满意的代码。

示例输入

已使用的文件系统 1K 块可用使用百分比安装于 /dev/sda1 132239776 6210884 119311504 5% / tmpfs 4021876 0 4021876 0% /dev/shm

示例输出: if Use%>70 for any case> 消息:将 ROM 大小增加 20% if Use%<30% for any case> 消息:将 ROM 大小减少 25%

即使我必须将其放入 Elastic 搜索中,它也会出错:

package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
  def main(args: Array[String]) {
  }
  import org.apache.spark.SparkConf
  val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
  val sc = new SparkContext(conf)
  val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
  import org.apache.spark.streaming.StreamingContext
  import org.apache.spark.streaming.Seconds
  val batchIntervalSeconds = 2
  val ssc = new StreamingContext(conf, Seconds(10))
  import org.apache.spark.streaming.kafka.KafkaUtils
  import org.apache.spark.streaming.dstream.ReceiverInputDStream
  val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
  val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
    "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))

  import org.apache.spark.streaming.dstream.DStream

  val filteredStream: DStream[Array[String]] = kafkaStream
    .filter(!_._2.contains("Filesystem"))  // eliminate header
    .map(_._2.split("\s+"))  // split with space
  val outputDStream: DStream[String] = filteredStream.map {
    row =>
      val useIdx = row.length - 2
      // if Use%>70 for any case> Message: Increase ROM size by 20%
      // if Use%<30% for any case> Message: Decrease ROM size by 25%
      val usePercent = row(useIdx).replace("%", "").toInt
      usePercent match {
        case x if x > 70 => "Increase ROM size by 20%"
        case x if x < 30 => "Decrease ROM size by 25%"
        case _ => "Undefined"
      }

  outputDStream.print()
  import org.elasticsearch.spark.sql._
  outputDStream.saveToEs("dfvalueoperations_v1/kwc")
}
// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))
ssc.checkpoint(checkpointDir)
ssc
//    }
// This starts the streaming context in the background.
ssc.start()
// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}

错误:错误:(51, 21) 值 saveToEs 不是 org.apache.spark.streaming.dstream.DStream[String] 的成员 outputDStream.saveToEs("kafkamessage_v1/kwc")

为获得所需的输出做了一些假设。

1.) Headers 可能出现在两者之间,因此使用过滤器删除 header.

Filesystem 1K-blocks Used Available Use% Mounted on

2.) 由于 Filesysytem 列的字符串中可能包含 space,因此我使用倒数第二个索引提取了 use%。 (如果这不起作用,请尝试 group regex 来达到同样的效果)

3.) 未定义使用百分比在 30 到 70 之间的情况,因此对于这种情况,输出消息包含 "Undefined"。

示例输入输出(使用Array[String]

 scala> val input =
           |       """|Filesystem    512-blocks      Used Available Capacity iused      ifree %iused  Mounted on
           |          |/dev/disk1     234618880 154868528  79238352    67% 1784543 4293182736    0%   /
           |          |devfs                364       364         0   100%     630          0  100%   /dev
           |          |map -hosts             0         0         0   100%       0          0  100%   /net
           |          |map auto_home          0         0         0   100%       0          0  100%   /home""".stripMargin


 scala> val inputStr: Array[Array[String]] = input.split("\n").filter(!_.contains("Filesystem")).map(_.split("\s+"))

 scala> val outputMessage = inputStr.map {
      |       row =>
      |         // Assuming the position is always second from last
      |         val elementPosition = row.length - 2 
      |         // if Use%>70 for any case> Message: Increase ROM size by 20%
      |         // if Use%<30% for any case> Message: Decrease ROM size by 25%
      |         val usePercent = row(elementPosition).replace("%", "").toInt
      |         usePercent match {
      |           case x if x > 70 => (usePercent, "Increase ROM size by 20%")
      |           case x if x < 30 => (usePercent, "Decrease ROM size by 25%")
      |           case _ => (usePercent, "Undefined")
      |         }
      |     }

 scala> outputMessage.foreach(println)
 (0,Decrease ROM size by 25%)
 (100,Increase ROM size by 20%)
 (100,Increase ROM size by 20%)
 (100,Increase ROM size by 20%)

此代码适用于 Array[String] 请测试它适用于 ReceiverInputDStream[(String, String)]。代码必须类似于:

 val filteredStream: DStream[Array[String]] = kafkaStream
       .filter(!_._2.contains("Filesystem"))  // eliminate header
       .map(_._2.split("\s+"))  // split with space
 val outputDStream: DStream[String] = filteredStream.map {
       row =>
         val useIdx = row.length - 2
         // if Use%>70 for any case> Message: Increase ROM size by 20%
         // if Use%<30% for any case> Message: Decrease ROM size by 25%
         val usePercent = row(useIdx).replace("%", "").toInt
         usePercent match {
           case x if x > 70 => "Increase ROM size by 20%"
           case x if x < 30 => "Decrease ROM size by 25%"
           case _ => "Undefined"
         }
     }

希望对您有所帮助。