Spark / Scala:向前填充最后的观察
Spark / Scala: forward fill with last observation
使用 Spark 1.4.0、Scala 2.10
我一直在尝试找出一种方法来使用最后已知的观察结果转发填充空值,但我没有找到一种简单的方法。我认为这是一件很常见的事情,但找不到说明如何执行此操作的示例。
我看到函数用一个值前向填充 NaN,或者滞后/前导函数通过偏移量填充或移动数据,但没有找到最后一个已知值。
在线查看,我在 R 中看到很多 Q/A 相同的内容,但在 Spark / Scala 中却没有。
我正在考虑在日期范围内进行映射,从结果中过滤掉 NaN 并选择最后一个元素,但我想我对语法感到困惑。
我尝试使用 DataFrames
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
但这对我没有任何帮助。
过滤部分不起作用;映射函数 return 是 spark.sql.Columns 的一个序列,但是过滤函数期望 return 一个布尔值,所以我需要从列中获取一个值进行测试,但似乎只有是 return 列的列方法。
有什么方法可以在 Spark 上做更多 'simply' 吗?
感谢您的意见
编辑:
简单示例示例输入:
2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
预期输出:
2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
注:
- 我有很多列,其中很多列都有这种缺失数据模式,但并不相同 date/time。如果需要,我会一次转换一列。
编辑:
根据@zero323 的回答,我尝试了这种方式:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = { !row.isNullAt(1) }
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
val toCarryBd = sc.broadcast(toCarry)
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }
val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}
广播变量最终是一个没有空值的列表。这是进步,但我仍然无法使映射工作。
但我什么也没得到,因为索引 i
没有映射到原始数据,它映射到没有 null 的子集。
我在这里错过了什么?
编辑和解决方案(根据@zero323 的回答推断):
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))
如果您使用的是 RDD 而不是 DataFrames,请参阅下面的 zero323 的回答以获得更多选项。上面的解决方案可能不是最有效的,但对我有用。如果您正在寻求优化,请查看 RDD 解决方案。
初始答案(单一时间序列假设):
首先,如果您不能提供 PARTITION BY
子句,请尽量避免使用 window 函数。它将数据移动到单个分区,因此大多数时候根本不可行。
您可以使用 mapPartitionsWithIndex
填补 RDD
上的空白。由于您没有提供示例数据或预期输出,因此将其视为伪代码而不是真正的 Scala 程序:
首先让我们按日期排序 DataFrame
并转换为 RDD
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
接下来让我们找到每个分区的最后一个非空观察
def notMissing(row: Row): Boolean = ???
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
.mapPartitionsWithIndex{ case (i, iter) =>
Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
并将此Map
转换为广播
val toCarryBd = sc.broadcast(toCarry)
终于映射分区再次填补空白:
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
// If it is the beginning of partition and value is missing
// extract value to fill from toCarryBd.value
// Remember to correct for empty / only missing partitions
// otherwise take last not-null from the current partition
}
val imputed: RDD[Row] = rows
.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) }
最终转换回DataFrame
编辑(每组数据的分区/时间序列):
细节决定成败。如果你的数据毕竟是分区的,那么使用 groupBy
就可以解决整个问题。假设您简单地按类型 T
的列 "v" 进行分区,并且 Date
是一个整数时间戳:
def fill(iter: List[Row]): List[Row] = {
// Just go row by row and fill with last non-empty value
???
}
val groupedAndSorted = df.rdd
.groupBy(_.getAs[T]("k"))
.mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
val dfFilled = sqlContext.createDataFrame(rows, df.schema)
这样您可以同时填写所有列。
Can this be done with DataFrames instead of converting back and forth to RDD?
这取决于,尽管它不太可能有效。如果最大间隙相对较小,你可以这样做:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column
val maxGap: Int = ??? // Maximum gap between observations
val columnsToFill: List[String] = ??? // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed
// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
// Generate lag values between 1 and maxGap
val lags = (1 to maxGap).map(lag(col(c), _)over(w))
// Add current, coalesce and set alias
coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}
// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))
// Finally select
val dfImputed = df.select($"*" :: lags: _*)
可以轻松调整以使用每列不同的最大间距。
在最新的 Spark 版本中获得类似结果的更简单方法是使用 last
和 ignoreNulls
:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"k").orderBy($"Date")
.rowsBetween(Window.unboundedPreceding, -1)
df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
虽然可以删除 partitionBy
子句并在全球范围内应用此方法,但对于大型数据集来说,这将非常昂贵。
可以只使用 Window 函数(没有 last 函数)和某种巧妙的分区来做到这一点。我个人非常不喜欢必须使用 groupBy 的组合然后进一步加入。
所以给出:
date, currency, rate
20190101 JPY NULL
20190102 JPY 2
20190103 JPY NULL
20190104 JPY NULL
20190102 JPY 3
20190103 JPY 4
20190104 JPY NULL
我们可以使用Window.unboundedPreceding和Window.unboundedFollowing来创建前向和后向填充的键。
以下代码:
val w1 = Window.partitionBy("currency").orderBy(asc("date"))
df
.select("date", "currency", "rate")
// Equivalent of fill.na(0, Seq("rate")) but can be more generic here
// You may need an abs(col("rate")) if value col can be negative since it will not work with the following sums to build the foward and backward keys
.withColumn("rate_filled", when(col("rate").isNull, lit(0)).otherwise(col("rate)))
.withColumn("rate_backsum",
sum("rate_filled").over(w1.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
.withColumn("rate_forwardsum",
sum("rate_filled").over(w1.rowsBetween(Window.currentRow, Window.unboundedFollowing)))
给出:
date, currency, rate, rate_filled, rate_backsum, rate_forwardsum
20190101 JPY NULL 0 0 9
20190102 JPY 2 2 2 9
20190103 JPY NULL 0 2 7
20190104 JPY NULL 0 2 7
20190102 JPY 3 3 5 7
20190103 JPY 4 4 9 4
20190104 JPY NULL 0 9 0
因此,我们构建了两个密钥(x_backsum 和x_forwardsum),可用于填充和填充。使用以下两条迷你图:
val wb = Window.partitionBy("currency", "rate_backsum")
val wf = Window.partitionBy("currency", "rate_forwardsum")
...
.withColumn("rate_backfilled", avg("rate").over(wb))
.withColumn("rate_forwardfilled", avg("rate").over(wf))
最后:
date, currency, rate, rate_backsum, rate_forwardsum, rate_ffilled
20190101 JPY NULL 0 9 2
20190102 JPY 2 2 9 2
20190103 JPY NULL 2 7 3
20190104 JPY NULL 2 7 3
20190102 JPY 3 5 7 3
20190103 JPY 4 9 4 4
20190104 JPY NULL 9 0 0
使用 Spark 1.4.0、Scala 2.10
我一直在尝试找出一种方法来使用最后已知的观察结果转发填充空值,但我没有找到一种简单的方法。我认为这是一件很常见的事情,但找不到说明如何执行此操作的示例。
我看到函数用一个值前向填充 NaN,或者滞后/前导函数通过偏移量填充或移动数据,但没有找到最后一个已知值。
在线查看,我在 R 中看到很多 Q/A 相同的内容,但在 Spark / Scala 中却没有。
我正在考虑在日期范围内进行映射,从结果中过滤掉 NaN 并选择最后一个元素,但我想我对语法感到困惑。
我尝试使用 DataFrames
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
但这对我没有任何帮助。
过滤部分不起作用;映射函数 return 是 spark.sql.Columns 的一个序列,但是过滤函数期望 return 一个布尔值,所以我需要从列中获取一个值进行测试,但似乎只有是 return 列的列方法。
有什么方法可以在 Spark 上做更多 'simply' 吗?
感谢您的意见
编辑:
简单示例示例输入:
2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
预期输出:
2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
注:
- 我有很多列,其中很多列都有这种缺失数据模式,但并不相同 date/time。如果需要,我会一次转换一列。
编辑:
根据@zero323 的回答,我尝试了这种方式:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = { !row.isNullAt(1) }
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
val toCarryBd = sc.broadcast(toCarry)
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }
val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}
广播变量最终是一个没有空值的列表。这是进步,但我仍然无法使映射工作。
但我什么也没得到,因为索引 i
没有映射到原始数据,它映射到没有 null 的子集。
我在这里错过了什么?
编辑和解决方案(根据@zero323 的回答推断):
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))
如果您使用的是 RDD 而不是 DataFrames,请参阅下面的 zero323 的回答以获得更多选项。上面的解决方案可能不是最有效的,但对我有用。如果您正在寻求优化,请查看 RDD 解决方案。
初始答案(单一时间序列假设):
首先,如果您不能提供 PARTITION BY
子句,请尽量避免使用 window 函数。它将数据移动到单个分区,因此大多数时候根本不可行。
您可以使用 mapPartitionsWithIndex
填补 RDD
上的空白。由于您没有提供示例数据或预期输出,因此将其视为伪代码而不是真正的 Scala 程序:
首先让我们按日期排序
DataFrame
并转换为RDD
import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD val rows: RDD[Row] = df.orderBy($"Date").rdd
接下来让我们找到每个分区的最后一个非空观察
def notMissing(row: Row): Boolean = ??? val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows .mapPartitionsWithIndex{ case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) } .collectAsMap
并将此
Map
转换为广播val toCarryBd = sc.broadcast(toCarry)
终于映射分区再次填补空白:
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { // If it is the beginning of partition and value is missing // extract value to fill from toCarryBd.value // Remember to correct for empty / only missing partitions // otherwise take last not-null from the current partition } val imputed: RDD[Row] = rows .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) }
最终转换回DataFrame
编辑(每组数据的分区/时间序列):
细节决定成败。如果你的数据毕竟是分区的,那么使用 groupBy
就可以解决整个问题。假设您简单地按类型 T
的列 "v" 进行分区,并且 Date
是一个整数时间戳:
def fill(iter: List[Row]): List[Row] = {
// Just go row by row and fill with last non-empty value
???
}
val groupedAndSorted = df.rdd
.groupBy(_.getAs[T]("k"))
.mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
val dfFilled = sqlContext.createDataFrame(rows, df.schema)
这样您可以同时填写所有列。
Can this be done with DataFrames instead of converting back and forth to RDD?
这取决于,尽管它不太可能有效。如果最大间隙相对较小,你可以这样做:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column
val maxGap: Int = ??? // Maximum gap between observations
val columnsToFill: List[String] = ??? // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed
// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
// Generate lag values between 1 and maxGap
val lags = (1 to maxGap).map(lag(col(c), _)over(w))
// Add current, coalesce and set alias
coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}
// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))
// Finally select
val dfImputed = df.select($"*" :: lags: _*)
可以轻松调整以使用每列不同的最大间距。
在最新的 Spark 版本中获得类似结果的更简单方法是使用 last
和 ignoreNulls
:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"k").orderBy($"Date")
.rowsBetween(Window.unboundedPreceding, -1)
df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
虽然可以删除 partitionBy
子句并在全球范围内应用此方法,但对于大型数据集来说,这将非常昂贵。
可以只使用 Window 函数(没有 last 函数)和某种巧妙的分区来做到这一点。我个人非常不喜欢必须使用 groupBy 的组合然后进一步加入。
所以给出:
date, currency, rate
20190101 JPY NULL
20190102 JPY 2
20190103 JPY NULL
20190104 JPY NULL
20190102 JPY 3
20190103 JPY 4
20190104 JPY NULL
我们可以使用Window.unboundedPreceding和Window.unboundedFollowing来创建前向和后向填充的键。
以下代码:
val w1 = Window.partitionBy("currency").orderBy(asc("date"))
df
.select("date", "currency", "rate")
// Equivalent of fill.na(0, Seq("rate")) but can be more generic here
// You may need an abs(col("rate")) if value col can be negative since it will not work with the following sums to build the foward and backward keys
.withColumn("rate_filled", when(col("rate").isNull, lit(0)).otherwise(col("rate)))
.withColumn("rate_backsum",
sum("rate_filled").over(w1.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
.withColumn("rate_forwardsum",
sum("rate_filled").over(w1.rowsBetween(Window.currentRow, Window.unboundedFollowing)))
给出:
date, currency, rate, rate_filled, rate_backsum, rate_forwardsum
20190101 JPY NULL 0 0 9
20190102 JPY 2 2 2 9
20190103 JPY NULL 0 2 7
20190104 JPY NULL 0 2 7
20190102 JPY 3 3 5 7
20190103 JPY 4 4 9 4
20190104 JPY NULL 0 9 0
因此,我们构建了两个密钥(x_backsum 和x_forwardsum),可用于填充和填充。使用以下两条迷你图:
val wb = Window.partitionBy("currency", "rate_backsum")
val wf = Window.partitionBy("currency", "rate_forwardsum")
...
.withColumn("rate_backfilled", avg("rate").over(wb))
.withColumn("rate_forwardfilled", avg("rate").over(wf))
最后:
date, currency, rate, rate_backsum, rate_forwardsum, rate_ffilled
20190101 JPY NULL 0 9 2
20190102 JPY 2 2 9 2
20190103 JPY NULL 2 7 3
20190104 JPY NULL 2 7 3
20190102 JPY 3 5 7 3
20190103 JPY 4 9 4 4
20190104 JPY NULL 9 0 0