在键值 RDD 中寻找最大值
Finding Maximum in Key Value RDD
我有一个以下形式的键值 RDD:
(Some(23661587),
CompactBuffer(Posting(2,23661643,Some(23661587),0,None),
Posting(2,23661682,Some(23661587),0,None)))
这里Some(23661587)
是键,CompactBuffer
里面的数据是值。我想 select Posting
类型,每个键的特定属性具有最大值。
我该怎么做?我在 Scala 和 Spark 方面的经验有限。
谢谢
我用一些数据复制了你的例子。
正如@sinanspd 所说,org.apache.spark.util.collection.CompactBuffer
从 scala.collection.immutable.Seq
扩展而来,
你可以关注这个 link CompactBuffer,
因此您可以使用 scala.collection.immutable.Seq
Seq 中的方法对 Seq 进行排序并获得 Posting
最大值。
我的选择是 Posting.value
对 Seq 进行排序,但它可以是 value2 或 Posting 中的任何字段 class。
举个例子
object FindingMaximum {
val spark = SparkSession
.builder()
.appName("FindingMaximum")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
case class Posting(key: Int, value: Long, value2: Option[Long], value3: Int, value4: Option[Int])
val data = List((Some(23661587),Seq(Posting(2,23661643,Some(23661587),0,None), Posting(2,23661682,Some(23661587),0,None))),
(Some(23661588),Seq(Posting(3,23661743,Some(23661588),0,None), Posting(3,23661682,Some(23661588),0,None))),
(Some(23661589),Seq(Posting(4,23661843,Some(23661589),0,None), Posting(4,23661882,Some(23661589),0,None))))
def main(args: Array[String]): Unit = {
sc.setLogLevel("ERROR")
val rdd = sc.parallelize(data)
val rddKeyMax = rdd.map({case(key, v) =>
val max = v.sortBy(posting => posting.value).last
(key, max)
})
rddKeyMax.foreach(println)
}
}
/*
(Some(23661588),Posting(3,23661743,Some(23661588),0,None))
(Some(23661587),Posting(2,23661682,Some(23661587),0,None))
(Some(23661589),Posting(4,23661882,Some(23661589),0,None))
*/
我有一个以下形式的键值 RDD:
(Some(23661587),
CompactBuffer(Posting(2,23661643,Some(23661587),0,None),
Posting(2,23661682,Some(23661587),0,None)))
这里Some(23661587)
是键,CompactBuffer
里面的数据是值。我想 select Posting
类型,每个键的特定属性具有最大值。
我该怎么做?我在 Scala 和 Spark 方面的经验有限。 谢谢
我用一些数据复制了你的例子。
正如@sinanspd 所说,org.apache.spark.util.collection.CompactBuffer
从 scala.collection.immutable.Seq
扩展而来,
你可以关注这个 link CompactBuffer,
因此您可以使用 scala.collection.immutable.Seq
Seq 中的方法对 Seq 进行排序并获得 Posting
最大值。
我的选择是 Posting.value
对 Seq 进行排序,但它可以是 value2 或 Posting 中的任何字段 class。
举个例子
object FindingMaximum {
val spark = SparkSession
.builder()
.appName("FindingMaximum")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
case class Posting(key: Int, value: Long, value2: Option[Long], value3: Int, value4: Option[Int])
val data = List((Some(23661587),Seq(Posting(2,23661643,Some(23661587),0,None), Posting(2,23661682,Some(23661587),0,None))),
(Some(23661588),Seq(Posting(3,23661743,Some(23661588),0,None), Posting(3,23661682,Some(23661588),0,None))),
(Some(23661589),Seq(Posting(4,23661843,Some(23661589),0,None), Posting(4,23661882,Some(23661589),0,None))))
def main(args: Array[String]): Unit = {
sc.setLogLevel("ERROR")
val rdd = sc.parallelize(data)
val rddKeyMax = rdd.map({case(key, v) =>
val max = v.sortBy(posting => posting.value).last
(key, max)
})
rddKeyMax.foreach(println)
}
}
/*
(Some(23661588),Posting(3,23661743,Some(23661588),0,None))
(Some(23661587),Posting(2,23661682,Some(23661587),0,None))
(Some(23661589),Posting(4,23661882,Some(23661589),0,None))
*/