用于数组聚合的 spark streaming update_state_by_keys

spark streaming update_state_by_keys for arrays aggregation

我有如下输入行

t1, 文件 1, 1, 1, 1

t1, 文件 1, 1, 2, 3

t1, 文件 2, 2, 2, 2, 2

t2, 文件 1, 5, 5, 5

t2, 文件 2, 1, 1, 2, 2

以及如下所示的输出,它是相应数字的垂直相加。

文件 1 : [ 1+, 1+2+5, 1+3+5 ]

文件 2 : [ 2+1, 2+1, 2+2, 2+2 ]

目前数据聚合逻辑正在为批次间隔工作,但它不维护状态。所以,我正在添加 update_state_by_key 函数并传递下面的函数,这是正确的做法吗?

我当前的节目:

    def updateValues( newValues: Seq[Array[Int]], currentValue: Option[Array[Int]]) = {

        val previousCount = currentValue.getOrElse(Array.fill[Byte](newValues.length)(0))
        val allValues = newValues +: previousCount
        Some(allValues.toList.transpose.map(_.sum).toArray)

      }

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    // create a StreamingContext, the main entry point for all streaming functionality
    val ssc = new StreamingContext(sc, Seconds(2))
    // parse the lines of data into coverage objects
    val inputStream = ssc.socketTextStream(<hostname>, 9999)
    ssc.checkpoint("<hostname>:8020/user/spark/checkpoints_dir")
    inputStream.print(10)
    val parsedDstream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
      })
    val aggregated_file_counts = parsedDstream.updateStateByKey(updateValues)

        // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

  }

供参考,我以前的程序(没有状态转换):

def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("HBaseStream")
        val sc = new SparkContext(conf)
        // create a StreamingContext, the main entry point for all streaming functionality
        val ssc = new StreamingContext(sc, Seconds(2))
        val inputStream = ssc.socketTextStream("hostname", 9999)
        val parsedDstream = inputStream
          .map(line => {
            val splitLines = line.split(",")
            (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
          })
          .reduceByKey((first, second) => {
            val listOfArrays = ArrayBuffer(first, second)
            listOfArrays.toList.transpose.map(_.sum).toArray
          })
          .foreachRDD(rdd => rdd.foreach(Blaher.blah))
    }

提前致谢。

您要找的是updateStateByKey。对于 DStream[(T, U)] 它应该采用一个带有两个参数的函数:

  • Seq[U] - 表示当前 window
  • 的状态
  • Option[U] - 表示累积状态

和returnOption[U]

鉴于您的代码,它可以像这样实现,例如:

import breeze.linalg.{DenseVector => BDV}
import scala.util.Try

val state: DStream[(String, Array[Int])] = parsedStream.updateStateByKey(
  (current: Seq[Array[Int]], prev: Option[Array[Int]]) =>  {
    prev.map(_ +: current).orElse(Some(current))
    .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})

要使用它,您必须 configure checkpointing