在键值 RDD 中寻找最大值

Finding Maximum in Key Value RDD

我有一个以下形式的键值 RDD:

(Some(23661587),
CompactBuffer(Posting(2,23661643,Some(23661587),0,None), 
              Posting(2,23661682,Some(23661587),0,None)))

这里Some(23661587)是键,CompactBuffer里面的数据是值。我想 select Posting 类型,每个键的特定属性具有最大值。

我该怎么做?我在 Scala 和 Spark 方面的经验有限。 谢谢

我用一些数据复制了你的例子。

正如@sinanspd 所说,org.apache.spark.util.collection.CompactBufferscala.collection.immutable.Seq 扩展而来, 你可以关注这个 link CompactBuffer, 因此您可以使用 scala.collection.immutable.Seq Seq 中的方法对 Seq 进行排序并获得 Posting 最大值。

我的选择是 Posting.value 对 Seq 进行排序,但它可以是 value2 或 Posting 中的任何字段 class。

举个例子

object FindingMaximum {

  val spark = SparkSession
    .builder()
    .appName("FindingMaximum")
    .master("local[*]")
    .getOrCreate()

  val sc = spark.sparkContext

  case class Posting(key: Int, value: Long, value2: Option[Long], value3: Int, value4: Option[Int])

  val data = List((Some(23661587),Seq(Posting(2,23661643,Some(23661587),0,None), Posting(2,23661682,Some(23661587),0,None))),
                  (Some(23661588),Seq(Posting(3,23661743,Some(23661588),0,None), Posting(3,23661682,Some(23661588),0,None))),
                  (Some(23661589),Seq(Posting(4,23661843,Some(23661589),0,None), Posting(4,23661882,Some(23661589),0,None))))

  def main(args: Array[String]): Unit = {

    sc.setLogLevel("ERROR")

    val rdd = sc.parallelize(data)

    val rddKeyMax = rdd.map({case(key, v) =>
      val max = v.sortBy(posting => posting.value).last
      (key, max)
    })
    rddKeyMax.foreach(println)
  }
}

/*
(Some(23661588),Posting(3,23661743,Some(23661588),0,None))
(Some(23661587),Posting(2,23661682,Some(23661587),0,None))
(Some(23661589),Posting(4,23661882,Some(23661589),0,None))
*/