通过自定义函数进行 Spark 流式分组
Spark streaming group by custom function
我有如下输入行
t1, file1, 1, 1, 1
t1, file1, 1, 2, 3
t1, file2, 2, 2, 2, 2
t2, file1, 5, 5, 5
t2, file2, 1, 1, 2, 2
我想实现如下所示的输出,即相应数字的垂直相加。
file1 : [ 1+1+5, 1+2+5, 1+3+5 ]
file2 : [ 2+1, 2+1, 2+2, 2+2 ]
我在 spark streaming 上下文中,我很难找到按文件名聚合的方法。
看来我需要使用类似下面的东西,我不确定如何获得正确的语法。任何输入都会有所帮助。
myDStream.foreachRDD(rdd => rdd.groupBy())
或
myDStream.foreachRDD(rdd => rdd.aggregate())
我知道如何计算给定数字数组的垂直求和,但我不确定如何将该函数提供给聚合器。
def compute_counters(counts : ArrayBuffer[List[Int]]) = {
counts.toList.transpose.map(_.sum)
}
首先,您需要从逗号分隔的字符串中提取相关的键和值,解析它们,并创建一个包含键的元组,以及使用 InputDStream.map
. Then, use PairRDDFunctions.reduceByKey
应用总和的整数列表键:
dStream
.map(line => {
val splitLines = line.split(", ")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.toInt))
})
.reduceByKey((first, second) => (first._1, Array(first._2.sum + second._2.sum))
.foreachRDD((key, sum) => println(s"Key: $key, sum: ${sum.head}")
reduce 将生成一个 (String, Array[Int])
的元组,其中字符串包含 id(可以是 "test1" 或 "test2"),以及一个具有单个值的数组,其中包含每个键的总和。
感谢 Yuval,我能够使用您的方法做到这一点。更新我的最终工作代码:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", 9999)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
.reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
})
.foreachRDD(rdd => rdd.foreach(Blaher.blah))
}
我有如下输入行
t1, file1, 1, 1, 1
t1, file1, 1, 2, 3
t1, file2, 2, 2, 2, 2
t2, file1, 5, 5, 5
t2, file2, 1, 1, 2, 2
我想实现如下所示的输出,即相应数字的垂直相加。
file1 : [ 1+1+5, 1+2+5, 1+3+5 ]
file2 : [ 2+1, 2+1, 2+2, 2+2 ]
我在 spark streaming 上下文中,我很难找到按文件名聚合的方法。
看来我需要使用类似下面的东西,我不确定如何获得正确的语法。任何输入都会有所帮助。
myDStream.foreachRDD(rdd => rdd.groupBy())
或
myDStream.foreachRDD(rdd => rdd.aggregate())
我知道如何计算给定数字数组的垂直求和,但我不确定如何将该函数提供给聚合器。
def compute_counters(counts : ArrayBuffer[List[Int]]) = {
counts.toList.transpose.map(_.sum)
}
首先,您需要从逗号分隔的字符串中提取相关的键和值,解析它们,并创建一个包含键的元组,以及使用 InputDStream.map
. Then, use PairRDDFunctions.reduceByKey
应用总和的整数列表键:
dStream
.map(line => {
val splitLines = line.split(", ")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.toInt))
})
.reduceByKey((first, second) => (first._1, Array(first._2.sum + second._2.sum))
.foreachRDD((key, sum) => println(s"Key: $key, sum: ${sum.head}")
reduce 将生成一个 (String, Array[Int])
的元组,其中字符串包含 id(可以是 "test1" 或 "test2"),以及一个具有单个值的数组,其中包含每个键的总和。
感谢 Yuval,我能够使用您的方法做到这一点。更新我的最终工作代码:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", 9999)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
.reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
})
.foreachRDD(rdd => rdd.foreach(Blaher.blah))
}