星火(流)RDD foreachPartitionAsync functionality/working
Spark (streaming) RDD foreachPartitionAsync functionality/working
我会提出实际问题,但请先容忍我的用例。我有以下用例,假设我从某个地方得到了 rddStud
:
val rddStud: RDD[(String,Student)] = ???
其中 'String' - 一些随机字符串和 'Student' - case class Student(name: String, id: String, arrivalTime: Long, classId: String)
我使用 Student 仅作为示例 - 实际业务逻辑有很多不同的复杂 class 和许多字段。
我想要实现的是 - 具有相同id
的学生必须按arrivalTime
.
的升序处理
为此,我正在做的是:
//Get RDD from Student.id -> Student
val studMapRdd: RDD[(String,Student)] = rddStud.map(tuple => {
val student = tuple._2
(student.id,student)
})
//Make sure all students with same student.id are in same partition.
//I can potentially use groupByKey/combineByKey.... etc, but I don't see much performance difference
val studPartitionRdd: RDD[(String,Student)] = studMapRdd.partitionBy(new HashPartitioner(studMapRdd.getNumPartitions))
val studSortedRdd: RDD[(String,Student)] = studPartitionRdd.sortBy({ case(studentId,student} =>
student.arrivalTime
}, ascending = true)
studSortedRdd.foreachPartition(itr =>{
itr.foreach{ case (studentId, student) => {
val studentName = student.name
val time = student.arrivalTime
//send for additional processing studentName and time combination
}
})
我的问题是:
- 如果我使用 foreachPartitionAsync - 它会并行处理所有分区,但按顺序处理每个分区中的元素吗?如果不是,那么 foreachPartitionAsync 和 foreachAsync 之间有什么区别?
- 这种先分区后排序的做法,是否合理?或者您是否可以对上述逻辑提出任何优化建议?
非常感谢。
同步(foreach(Partition)
)和异步(foreach(Partition)Async
)提交之间的选择,以及元素方式和分区方式访问之间的选择都不会影响执行顺序。在第一种情况下,阻塞与非阻塞执行的重要区别,在第二种情况下,数据公开的方式但实际执行机制或多或少相同。
重新分区后排序不是有效的方法。 sortBy
将触发完全随机播放并且不会保留现有的数据分布。如果你想保留现有的数据布局,你可以在随后的 mapPartitions
阶段进行排序,或者更好地使用 repartitionAndSortWithinPartitions
.
class StudentIdPartitioner[V](n: Int) extends org.apache.spark.Partitioner {
def numPartitions: Int = n
def getPartition(key: Any): Int = {
val x = key.asInstanceOf[Student].id.hashCode % n
x + (if (x < 0) n else 0)
}
}
val rddStud: RDD[Student] = ???
val partitioner = new StudentIdPartitioner(rddStud.getNumPartitions)
val arrTimeOrdering = scala.math.Ordering.by[Student, Long](_.arrivalTime)
{
implicit val ord = arrTimeOrdering
rddStud.map((_, null)).repartitionAndSortWithinPartitions(partitioner)
}
我会提出实际问题,但请先容忍我的用例。我有以下用例,假设我从某个地方得到了 rddStud
:
val rddStud: RDD[(String,Student)] = ???
其中 'String' - 一些随机字符串和 'Student' - case class Student(name: String, id: String, arrivalTime: Long, classId: String)
我使用 Student 仅作为示例 - 实际业务逻辑有很多不同的复杂 class 和许多字段。
我想要实现的是 - 具有相同id
的学生必须按arrivalTime
.
为此,我正在做的是:
//Get RDD from Student.id -> Student
val studMapRdd: RDD[(String,Student)] = rddStud.map(tuple => {
val student = tuple._2
(student.id,student)
})
//Make sure all students with same student.id are in same partition.
//I can potentially use groupByKey/combineByKey.... etc, but I don't see much performance difference
val studPartitionRdd: RDD[(String,Student)] = studMapRdd.partitionBy(new HashPartitioner(studMapRdd.getNumPartitions))
val studSortedRdd: RDD[(String,Student)] = studPartitionRdd.sortBy({ case(studentId,student} =>
student.arrivalTime
}, ascending = true)
studSortedRdd.foreachPartition(itr =>{
itr.foreach{ case (studentId, student) => {
val studentName = student.name
val time = student.arrivalTime
//send for additional processing studentName and time combination
}
})
我的问题是:
- 如果我使用 foreachPartitionAsync - 它会并行处理所有分区,但按顺序处理每个分区中的元素吗?如果不是,那么 foreachPartitionAsync 和 foreachAsync 之间有什么区别?
- 这种先分区后排序的做法,是否合理?或者您是否可以对上述逻辑提出任何优化建议?
非常感谢。
同步(foreach(Partition)
)和异步(foreach(Partition)Async
)提交之间的选择,以及元素方式和分区方式访问之间的选择都不会影响执行顺序。在第一种情况下,阻塞与非阻塞执行的重要区别,在第二种情况下,数据公开的方式但实际执行机制或多或少相同。
重新分区后排序不是有效的方法。 sortBy
将触发完全随机播放并且不会保留现有的数据分布。如果你想保留现有的数据布局,你可以在随后的 mapPartitions
阶段进行排序,或者更好地使用 repartitionAndSortWithinPartitions
.
class StudentIdPartitioner[V](n: Int) extends org.apache.spark.Partitioner {
def numPartitions: Int = n
def getPartition(key: Any): Int = {
val x = key.asInstanceOf[Student].id.hashCode % n
x + (if (x < 0) n else 0)
}
}
val rddStud: RDD[Student] = ???
val partitioner = new StudentIdPartitioner(rddStud.getNumPartitions)
val arrTimeOrdering = scala.math.Ordering.by[Student, Long](_.arrivalTime)
{
implicit val ord = arrTimeOrdering
rddStud.map((_, null)).repartitionAndSortWithinPartitions(partitioner)
}