我如何处理 Spark Streaming 中的元组?
How I can deal with Tuple in Spark Streaming?
我在使用 Spark Scala 时遇到问题,我想在 Spark 流中乘以 Tuple 元素,我从 kafka 获取数据到 dstream,我的 RDD 数据是这样的,
(2,[2,3,4,6,5])
(4,[2,3,4,6,5])
(7,[2,3,4,6,5])
(9,[2,3,4,6,5])
我想这样用乘法运算,
(2,[2*2,3*2,4*2,6*2,5*2])
(4,[2*4,3*4,4*4,6*4,5*4])
(7,[2*7,3*7,4*7,6*7,5*7])
(9,[2*9,3*9,4*9,6*9,5*9])
然后,我得到这样的rdd,
(2,[4,6,8,12,10])
(4,[8,12,16,24,20])
(7,[14,21,28,42,35])
(9,[18,27,36,54,45])
最后,我像这样把第二个元素按最小的顺序排列,
(2,4)
(4,8)
(7,14)
(9,18)
如何使用 dstream 中的 scala 执行此操作?我使用 spark 版本 1.6
给你一个scala的demo
// val conf = new SparkConf().setAppName("ttt").setMaster("local")
//val sc = new SparkContext(conf)
// val data =Array("2,2,3,4,6,5","4,2,3,4,6,5","7,2,3,4,6,5","9,2,3,4,6,5")
//val lines = sc.parallelize(data)
//change to your data (each RDD in streaming)
lines.map(x => (x.split(",")(0).toInt,List(x.split(",")(1).toInt,x.split(",")(2).toInt,x.split(",")(3).toInt,x.split(",")(4).toInt,x.split(",")(5).toInt) ))
.map(x =>(x._1 ,x._2.min)).map(x => (x._1,x._2* x._1)).foreach(x => println(x))
这是结果
(2,4)
(4,8)
(7,14)
(9,18)
DStream中的每个RDD都包含特定时间间隔的数据,你可以随心所欲地操作每个RDD
假设您在变量 input:
中获取元组 rdd
import scala.collection.mutable.ListBuffer
val result = input
.map(x => { // for each element
var l = new ListBuffer[Int]() // create a new list for storing the multiplication result
for(i <- x._1){ // for each element in the array
l += x._0 * i // append the multiplied result to the new list
}
(x._0, l.toList) // return the new tuple
})
.map(x => {
(x._0, x._1.min) // return the new tuple with the minimum element in it from the list
})
result.foreach(println)
应导致:
(2,4)
(4,8)
(7,14)
(9,18)
我在使用 Spark Scala 时遇到问题,我想在 Spark 流中乘以 Tuple 元素,我从 kafka 获取数据到 dstream,我的 RDD 数据是这样的,
(2,[2,3,4,6,5])
(4,[2,3,4,6,5])
(7,[2,3,4,6,5])
(9,[2,3,4,6,5])
我想这样用乘法运算,
(2,[2*2,3*2,4*2,6*2,5*2])
(4,[2*4,3*4,4*4,6*4,5*4])
(7,[2*7,3*7,4*7,6*7,5*7])
(9,[2*9,3*9,4*9,6*9,5*9])
然后,我得到这样的rdd,
(2,[4,6,8,12,10])
(4,[8,12,16,24,20])
(7,[14,21,28,42,35])
(9,[18,27,36,54,45])
最后,我像这样把第二个元素按最小的顺序排列,
(2,4)
(4,8)
(7,14)
(9,18)
如何使用 dstream 中的 scala 执行此操作?我使用 spark 版本 1.6
给你一个scala的demo
// val conf = new SparkConf().setAppName("ttt").setMaster("local")
//val sc = new SparkContext(conf)
// val data =Array("2,2,3,4,6,5","4,2,3,4,6,5","7,2,3,4,6,5","9,2,3,4,6,5")
//val lines = sc.parallelize(data)
//change to your data (each RDD in streaming)
lines.map(x => (x.split(",")(0).toInt,List(x.split(",")(1).toInt,x.split(",")(2).toInt,x.split(",")(3).toInt,x.split(",")(4).toInt,x.split(",")(5).toInt) ))
.map(x =>(x._1 ,x._2.min)).map(x => (x._1,x._2* x._1)).foreach(x => println(x))
这是结果
(2,4)
(4,8)
(7,14)
(9,18)
DStream中的每个RDD都包含特定时间间隔的数据,你可以随心所欲地操作每个RDD
假设您在变量 input:
中获取元组 rddimport scala.collection.mutable.ListBuffer
val result = input
.map(x => { // for each element
var l = new ListBuffer[Int]() // create a new list for storing the multiplication result
for(i <- x._1){ // for each element in the array
l += x._0 * i // append the multiplied result to the new list
}
(x._0, l.toList) // return the new tuple
})
.map(x => {
(x._0, x._1.min) // return the new tuple with the minimum element in it from the list
})
result.foreach(println)
应导致:
(2,4)
(4,8)
(7,14)
(9,18)