星火(流)RDD foreachPartitionAsync functionality/working

Spark (streaming) RDD foreachPartitionAsync functionality/working

我会提出实际问题,但请先容忍我的用例。我有以下用例,假设我从某个地方得到了 rddStud

val rddStud: RDD[(String,Student)] = ???

其中 'String' - 一些随机字符串和 'Student' - case class Student(name: String, id: String, arrivalTime: Long, classId: String)

我使用 Student 仅作为示例 - 实际业务逻辑有很多不同的复杂 class 和许多字段。

我想要实现的是 - 具有相同id的学生必须按arrivalTime.

的升序处理

为此,我正在做的是:

//Get RDD from Student.id -> Student
val studMapRdd: RDD[(String,Student)] = rddStud.map(tuple => { 
  val student = tuple._2
  (student.id,student)
})

//Make sure all students with same student.id are in same partition.
//I can potentially use groupByKey/combineByKey.... etc, but I don't see much performance difference    
val studPartitionRdd: RDD[(String,Student)] = studMapRdd.partitionBy(new HashPartitioner(studMapRdd.getNumPartitions))

val studSortedRdd: RDD[(String,Student)] = studPartitionRdd.sortBy({ case(studentId,student} => 
    student.arrivalTime
 }, ascending = true)

studSortedRdd.foreachPartition(itr =>{
    itr.foreach{ case (studentId, student) => { 
       val studentName = student.name
       val time = student.arrivalTime 
       //send for additional processing studentName and time combination
    }
})

我的问题是:

  1. 如果我使用 foreachPartitionAsync - 它会并行处理所有分区,但按顺序处理每个分区中的元素吗?如果不是,那么 foreachPartitionAsync 和 foreachAsync 之间有什么区别?
  2. 这种先分区后排序的做法,是否合理?或者您是否可以对上述逻辑提出任何优化建议?

非常感谢。

同步(foreach(Partition))和异步(foreach(Partition)Async)提交之间的选择,以及元素方式和分区方式访问之间的选择都不会影响执行顺序。在第一种情况下,阻塞与非阻塞执行的重要区别,在第二种情况下,数据公开的方式但实际执行机制或多或少相同。

重新分区后排序不是有效的方法。 sortBy 将触发完全随机播放并且不会保留现有的数据分布。如果你想保留现有的数据布局,你可以在随后的 mapPartitions 阶段进行排序,或者更好地使用 repartitionAndSortWithinPartitions.

class StudentIdPartitioner[V](n: Int) extends org.apache.spark.Partitioner {
  def numPartitions: Int = n
  def getPartition(key: Any): Int = {
    val x = key.asInstanceOf[Student].id.hashCode % n
    x + (if (x < 0) n else 0)
  }
}

val rddStud: RDD[Student] = ???
val partitioner = new StudentIdPartitioner(rddStud.getNumPartitions)
val arrTimeOrdering = scala.math.Ordering.by[Student, Long](_.arrivalTime)


{
  implicit val ord = arrTimeOrdering
  rddStud.map((_, null)).repartitionAndSortWithinPartitions(partitioner)
}