通过自定义函数进行 Spark 流式分组

Spark streaming group by custom function

我有如下输入行

t1, file1, 1, 1, 1
t1, file1, 1, 2, 3
t1, file2, 2, 2, 2, 2
t2, file1, 5, 5, 5
t2, file2, 1, 1, 2, 2

我想实现如下所示的输出,即相应数字的垂直相加。

file1 : [ 1+1+5, 1+2+5, 1+3+5 ]
file2 : [ 2+1, 2+1, 2+2, 2+2 ]

我在 spark streaming 上下文中,我很难找到按文件名聚合的方法。

看来我需要使用类似下面的东西,我不确定如何获得正确的语法。任何输入都会有所帮助。

myDStream.foreachRDD(rdd => rdd.groupBy())myDStream.foreachRDD(rdd => rdd.aggregate())

我知道如何计算给定数字数组的垂直求和,但我不确定如何将该函数提供给聚合器。

def compute_counters(counts : ArrayBuffer[List[Int]]) = {
  counts.toList.transpose.map(_.sum)
}

首先,您需要从逗号分隔的字符串中提取相关的键和值,解析它们,并创建一个包含键的元组,以及使用 InputDStream.map. Then, use PairRDDFunctions.reduceByKey 应用总和的整数列表键:

dStream
.map(line => {
  val splitLines = line.split(", ")
  (splitLines(1), splitLines.slice(2, splitLines.length).map(_.toInt))
})
.reduceByKey((first, second) => (first._1, Array(first._2.sum + second._2.sum))
.foreachRDD((key, sum) => println(s"Key: $key, sum: ${sum.head}")

reduce 将生成一个 (String, Array[Int]) 的元组,其中字符串包含 id(可以是 "test1" 或 "test2"),以及一个具有单个值的数组,其中包含每个键的总和。

感谢 Yuval,我能够使用您的方法做到这一点。更新我的最终工作代码:

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    // create a StreamingContext, the main entry point for all streaming functionality
    val ssc = new StreamingContext(sc, Seconds(2))
    val inputStream = ssc.socketTextStream("hostname", 9999)
    val parsedDstream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
      })
      .reduceByKey((first, second) => {
        val listOfArrays = ArrayBuffer(first, second)
        listOfArrays.toList.transpose.map(_.sum).toArray
      })
      .foreachRDD(rdd => rdd.foreach(Blaher.blah))
}