spark mapPartitionsWithIndex 处理空分区
spark mapPartitionsWithIndex handling empty partitions
在mapPartitionsWithIndex
中应该如何处理空分区?
可以找到完整的示例:https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2
我的目标是通过 RDD 用最后一个已知值填充 nan 值,作为对 .
的改进
但有些分区不包含任何值:
###################### carry
Map(2 -> None, 5 -> None, 4 -> None, 7 -> Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 -> Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)), 6 -> None, 0 -> None)
(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)
()
###################### carry
case class FooBar(foo: Option[Date], bar: String)
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
("2016-wrongFormat", "noValidFormat"),
("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined
myDf.rdd.filter(x => notMissing(Some(x))).count
val toCarry: Map[Int, Option[FooBar]] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(x => notMissing(Some(x))).toSeq.lastOption)) }.collectAsMap
使用时
val toCarryBd = spark.sparkContext.broadcast(toCarry)
def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = {
if (iter.isEmpty) {
iter
} else {
var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
iter.map(foo => {
println("original ", foo)
if (!notMissing(Some(foo))) {
println("replaced")
// this will go into the default case
// FooBar(lastNotNullRow.getOrElse(FooBar(Option(Date.valueOf("2016-01-01")), "DUMMY")).foo, foo.bar)
FooBar(lastNotNullRow.get.foo, foo.bar) // TODO warning this throws an error
} else {
lastNotNullRow = Some(foo)
foo
}
})
}
}
val imputed: RDD[FooBar] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
填写它会崩溃的值。
编辑
如果应用来自答案的输入,则输出。仍然不是 100%
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-04| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
就处理 mapPartitions(和类似)时的空分区而言,一般方法是 return 当您有一个空输入迭代器时,使用正确类型的空迭代器。
看起来您的代码正在执行此操作,但您的应用程序逻辑中似乎存在错误(即它假设如果分区有一条记录缺少值,则它要么在中有前一行相同的分区是好的,或者前一个分区不是空的并且有一个好的行 - 不一定是这种情况)。您已经部分解决了这个问题,方法是遍历并为每个分区收集最后一个先前的好值,然后如果您在分区开始时没有好值,请在收集的数组中查找该值。
但是,如果这也发生在前一个分区为空的同时,您将需要去查找之前的前一个分区值,直到找到您要查找的分区。 (请注意,这假设您的数据集中的第一条记录有效,否则您的代码仍然会失败)。
您的解决方案非常接近工作,但只是有一些不一定总是成立的小假设。
在mapPartitionsWithIndex
中应该如何处理空分区?
可以找到完整的示例:https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2
我的目标是通过 RDD 用最后一个已知值填充 nan 值,作为对
但有些分区不包含任何值:
###################### carry
Map(2 -> None, 5 -> None, 4 -> None, 7 -> Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 -> Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)), 6 -> None, 0 -> None)
(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)
()
###################### carry
case class FooBar(foo: Option[Date], bar: String)
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
("2016-wrongFormat", "noValidFormat"),
("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined
myDf.rdd.filter(x => notMissing(Some(x))).count
val toCarry: Map[Int, Option[FooBar]] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(x => notMissing(Some(x))).toSeq.lastOption)) }.collectAsMap
使用时
val toCarryBd = spark.sparkContext.broadcast(toCarry)
def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = {
if (iter.isEmpty) {
iter
} else {
var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
iter.map(foo => {
println("original ", foo)
if (!notMissing(Some(foo))) {
println("replaced")
// this will go into the default case
// FooBar(lastNotNullRow.getOrElse(FooBar(Option(Date.valueOf("2016-01-01")), "DUMMY")).foo, foo.bar)
FooBar(lastNotNullRow.get.foo, foo.bar) // TODO warning this throws an error
} else {
lastNotNullRow = Some(foo)
foo
}
})
}
}
val imputed: RDD[FooBar] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
填写它会崩溃的值。
编辑
如果应用来自答案的输入,则输出。仍然不是 100%
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-04| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
就处理 mapPartitions(和类似)时的空分区而言,一般方法是 return 当您有一个空输入迭代器时,使用正确类型的空迭代器。
看起来您的代码正在执行此操作,但您的应用程序逻辑中似乎存在错误(即它假设如果分区有一条记录缺少值,则它要么在中有前一行相同的分区是好的,或者前一个分区不是空的并且有一个好的行 - 不一定是这种情况)。您已经部分解决了这个问题,方法是遍历并为每个分区收集最后一个先前的好值,然后如果您在分区开始时没有好值,请在收集的数组中查找该值。
但是,如果这也发生在前一个分区为空的同时,您将需要去查找之前的前一个分区值,直到找到您要查找的分区。 (请注意,这假设您的数据集中的第一条记录有效,否则您的代码仍然会失败)。
您的解决方案非常接近工作,但只是有一些不一定总是成立的小假设。