用于数组聚合的 spark streaming update_state_by_keys
spark streaming update_state_by_keys for arrays aggregation
我有如下输入行
t1, 文件 1, 1, 1, 1
t1, 文件 1, 1, 2, 3
t1, 文件 2, 2, 2, 2, 2
t2, 文件 1, 5, 5, 5
t2, 文件 2, 1, 1, 2, 2
以及如下所示的输出,它是相应数字的垂直相加。
文件 1 : [ 1+, 1+2+5, 1+3+5 ]
文件 2 : [ 2+1, 2+1, 2+2, 2+2 ]
目前数据聚合逻辑正在为批次间隔工作,但它不维护状态。所以,我正在添加 update_state_by_key 函数并传递下面的函数,这是正确的做法吗?
我当前的节目:
def updateValues( newValues: Seq[Array[Int]], currentValue: Option[Array[Int]]) = {
val previousCount = currentValue.getOrElse(Array.fill[Byte](newValues.length)(0))
val allValues = newValues +: previousCount
Some(allValues.toList.transpose.map(_.sum).toArray)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
// parse the lines of data into coverage objects
val inputStream = ssc.socketTextStream(<hostname>, 9999)
ssc.checkpoint("<hostname>:8020/user/spark/checkpoints_dir")
inputStream.print(10)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
val aggregated_file_counts = parsedDstream.updateStateByKey(updateValues)
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
供参考,我以前的程序(没有状态转换):
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", 9999)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
.reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
})
.foreachRDD(rdd => rdd.foreach(Blaher.blah))
}
提前致谢。
您要找的是updateStateByKey
。对于 DStream[(T, U)]
它应该采用一个带有两个参数的函数:
Seq[U]
- 表示当前 window 的状态
Option[U]
- 表示累积状态
和returnOption[U]
。
鉴于您的代码,它可以像这样实现,例如:
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Int])] = parsedStream.updateStateByKey(
(current: Seq[Array[Int]], prev: Option[Array[Int]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
要使用它,您必须 configure checkpointing。
我有如下输入行
t1, 文件 1, 1, 1, 1
t1, 文件 1, 1, 2, 3
t1, 文件 2, 2, 2, 2, 2
t2, 文件 1, 5, 5, 5
t2, 文件 2, 1, 1, 2, 2
以及如下所示的输出,它是相应数字的垂直相加。
文件 1 : [ 1+, 1+2+5, 1+3+5 ]
文件 2 : [ 2+1, 2+1, 2+2, 2+2 ]
目前数据聚合逻辑正在为批次间隔工作,但它不维护状态。所以,我正在添加 update_state_by_key 函数并传递下面的函数,这是正确的做法吗?
我当前的节目:
def updateValues( newValues: Seq[Array[Int]], currentValue: Option[Array[Int]]) = {
val previousCount = currentValue.getOrElse(Array.fill[Byte](newValues.length)(0))
val allValues = newValues +: previousCount
Some(allValues.toList.transpose.map(_.sum).toArray)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
// parse the lines of data into coverage objects
val inputStream = ssc.socketTextStream(<hostname>, 9999)
ssc.checkpoint("<hostname>:8020/user/spark/checkpoints_dir")
inputStream.print(10)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
val aggregated_file_counts = parsedDstream.updateStateByKey(updateValues)
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
供参考,我以前的程序(没有状态转换):
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", 9999)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
.reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
})
.foreachRDD(rdd => rdd.foreach(Blaher.blah))
}
提前致谢。
您要找的是updateStateByKey
。对于 DStream[(T, U)]
它应该采用一个带有两个参数的函数:
Seq[U]
- 表示当前 window 的状态
Option[U]
- 表示累积状态
和returnOption[U]
。
鉴于您的代码,它可以像这样实现,例如:
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Int])] = parsedStream.updateStateByKey(
(current: Seq[Array[Int]], prev: Option[Array[Int]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
要使用它,您必须 configure checkpointing。