运行 Spark Scala 中列的 Cumulative/Iterative Costum 方法
Run a Cumulative/Iterative Costum Method on a Column in Spark Scala
嗨,我是 Spark/Scala 的新手,我一直在尝试 - 也就是失败,基于特定的递归公式在 spark 数据框中创建列:
这是伪代码。
someDf.col2[0] = 0
for i > 0
someDf.col2[i] = x * someDf.col1[i-1] + (1-x) * someDf.col2[i-1]
为了深入了解更多细节,这里是我的出发点:
此数据框是 dates
和个人 id
级别聚合的结果。
所有进一步的计算都必须针对该特定 id
进行,并且必须考虑上周发生的事情。
为了说明这一点,我将值简化为零和一,并删除了乘数 x
和 1-x
,并且我还将 col2
初始化为零。
var someDf = Seq(("2016-01-10 00:00:00.0","385608",0,0),
("2016-01-17 00:00:00.0","385608",0,0),
("2016-01-24 00:00:00.0","385608",1,0),
("2016-01-31 00:00:00.0","385608",1,0),
("2016-02-07 00:00:00.0","385608",1,0),
("2016-02-14 00:00:00.0","385608",1,0),
("2016-01-17 00:00:00.0","105010",0,0),
("2016-01-24 00:00:00.0","105010",1,0),
("2016-01-31 00:00:00.0","105010",0,0),
("2016-02-07 00:00:00.0","105010",1,0)
).toDF("dates", "id", "col1","col2" )
someDf.show()
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 0|
|2016-02-07 00:00:...|385608| 1| 0|
|2016-02-14 00:00:...|385608| 1| 0|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 0|
|2016-02-07 00:00:...|105010| 1| 0|
+--------------------+------+----+----+
到目前为止我尝试过的与我想要的
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val date_id_window = Window.partitionBy("id").orderBy(asc("dates"))
someDf.withColumn("col2", lag($"col1",1 ).over(date_id_window) +
lag($"col2",1 ).over(date_id_window) ).show()
+--------------------+------+----+----+ / +--------------------+
| dates| id|col1|col2| / | what_col2_should_be|
+--------------------+------+----+----+ / +--------------------+
|2016-01-17 00:00:...|105010| 0|null| / | 0|
|2016-01-24 00:00:...|105010| 1| 0| / | 0|
|2016-01-31 00:00:...|105010| 0| 1| / | 1|
|2016-02-07 00:00:...|105010| 1| 0| / | 1|
+-------------------------------------+ / +--------------------+
|2016-01-10 00:00:...|385608| 0|null| / | 0|
|2016-01-17 00:00:...|385608| 0| 0| / | 0|
|2016-01-24 00:00:...|385608| 1| 0| / | 0|
|2016-01-31 00:00:...|385608| 1| 1| / | 1|
|2016-02-07 00:00:...|385608| 1| 1| / | 2|
|2016-02-14 00:00:...|385608| 1| 1| / | 3|
+--------------------+------+----+----+ / +--------------------+
有没有办法用 Spark 数据框来做到这一点,我见过多个累积类型的计算,但从来没有包括同一列,我认为问题在于没有考虑第 i-1 行的新计算值,而是使用始终为 0 的旧 i-1。
如有任何帮助,我们将不胜感激。
Dataset
应该可以正常工作:
val x = 0.1
case class Record(dates: String, id: String, col1: Int)
someDf.drop("col2").as[Record].groupByKey(_.id).flatMapGroups((_, records) => {
val sorted = records.toSeq.sortBy(_.dates)
sorted.scanLeft((null: Record, 0.0)){
case ((_, col2), record) => (record, x * record.col1 + (1 - x) * col2)
}.tail
}).select($"_1.*", $"_2".alias("col2"))
您可以将 rowsBetween
api 与您正在使用的 Window
函数一起使用,您应该会得到所需的输出
val date_id_window = Window.partitionBy("id").orderBy(asc("dates"))
someDf.withColumn("col2", sum(lag($"col1", 1).over(date_id_window)).over(date_id_window.rowsBetween(Long.MinValue, 0)))
.withColumn("col2", when($"col2".isNull, lit(0)).otherwise($"col2"))
.show()
给定输入 dataframe
作为
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 0|
|2016-02-07 00:00:...|385608| 1| 0|
|2016-02-14 00:00:...|385608| 1| 0|
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 0|
|2016-02-07 00:00:...|105010| 1| 0|
+--------------------+------+----+----+
在应用上述逻辑后,您应该有输出数据框
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 1|
|2016-02-07 00:00:...|105010| 1| 1|
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 1|
|2016-02-07 00:00:...|385608| 1| 2|
|2016-02-14 00:00:...|385608| 1| 3|
+--------------------+------+----+----+
希望回答对你有帮助
您应该对数据框应用转换,而不是将其视为 var
。获得所需内容的一种方法是使用 Window 的 rowsBetween
对每个 window 分区内的行通过前一行(即行 -1
):
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("id").orderBy("dates").rowsBetween(Long.MinValue, -1)
val newDF = someDf.
withColumn(
"col2", sum($"col1").over(window)
).withColumn(
"col2", when($"col2".isNull, 0).otherwise($"col2")
).orderBy("id", "dates")
newDF.show
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 1|
|2016-02-07 00:00:...|105010| 1| 1|
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 1|
|2016-02-07 00:00:...|385608| 1| 2|
|2016-02-14 00:00:...|385608| 1| 3|
+--------------------+------+----+----+
嗨,我是 Spark/Scala 的新手,我一直在尝试 - 也就是失败,基于特定的递归公式在 spark 数据框中创建列:
这是伪代码。
someDf.col2[0] = 0
for i > 0
someDf.col2[i] = x * someDf.col1[i-1] + (1-x) * someDf.col2[i-1]
为了深入了解更多细节,这里是我的出发点:
此数据框是 dates
和个人 id
级别聚合的结果。
所有进一步的计算都必须针对该特定 id
进行,并且必须考虑上周发生的事情。
为了说明这一点,我将值简化为零和一,并删除了乘数 x
和 1-x
,并且我还将 col2
初始化为零。
var someDf = Seq(("2016-01-10 00:00:00.0","385608",0,0),
("2016-01-17 00:00:00.0","385608",0,0),
("2016-01-24 00:00:00.0","385608",1,0),
("2016-01-31 00:00:00.0","385608",1,0),
("2016-02-07 00:00:00.0","385608",1,0),
("2016-02-14 00:00:00.0","385608",1,0),
("2016-01-17 00:00:00.0","105010",0,0),
("2016-01-24 00:00:00.0","105010",1,0),
("2016-01-31 00:00:00.0","105010",0,0),
("2016-02-07 00:00:00.0","105010",1,0)
).toDF("dates", "id", "col1","col2" )
someDf.show()
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 0|
|2016-02-07 00:00:...|385608| 1| 0|
|2016-02-14 00:00:...|385608| 1| 0|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 0|
|2016-02-07 00:00:...|105010| 1| 0|
+--------------------+------+----+----+
到目前为止我尝试过的与我想要的
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val date_id_window = Window.partitionBy("id").orderBy(asc("dates"))
someDf.withColumn("col2", lag($"col1",1 ).over(date_id_window) +
lag($"col2",1 ).over(date_id_window) ).show()
+--------------------+------+----+----+ / +--------------------+
| dates| id|col1|col2| / | what_col2_should_be|
+--------------------+------+----+----+ / +--------------------+
|2016-01-17 00:00:...|105010| 0|null| / | 0|
|2016-01-24 00:00:...|105010| 1| 0| / | 0|
|2016-01-31 00:00:...|105010| 0| 1| / | 1|
|2016-02-07 00:00:...|105010| 1| 0| / | 1|
+-------------------------------------+ / +--------------------+
|2016-01-10 00:00:...|385608| 0|null| / | 0|
|2016-01-17 00:00:...|385608| 0| 0| / | 0|
|2016-01-24 00:00:...|385608| 1| 0| / | 0|
|2016-01-31 00:00:...|385608| 1| 1| / | 1|
|2016-02-07 00:00:...|385608| 1| 1| / | 2|
|2016-02-14 00:00:...|385608| 1| 1| / | 3|
+--------------------+------+----+----+ / +--------------------+
有没有办法用 Spark 数据框来做到这一点,我见过多个累积类型的计算,但从来没有包括同一列,我认为问题在于没有考虑第 i-1 行的新计算值,而是使用始终为 0 的旧 i-1。
如有任何帮助,我们将不胜感激。
Dataset
应该可以正常工作:
val x = 0.1
case class Record(dates: String, id: String, col1: Int)
someDf.drop("col2").as[Record].groupByKey(_.id).flatMapGroups((_, records) => {
val sorted = records.toSeq.sortBy(_.dates)
sorted.scanLeft((null: Record, 0.0)){
case ((_, col2), record) => (record, x * record.col1 + (1 - x) * col2)
}.tail
}).select($"_1.*", $"_2".alias("col2"))
您可以将 rowsBetween
api 与您正在使用的 Window
函数一起使用,您应该会得到所需的输出
val date_id_window = Window.partitionBy("id").orderBy(asc("dates"))
someDf.withColumn("col2", sum(lag($"col1", 1).over(date_id_window)).over(date_id_window.rowsBetween(Long.MinValue, 0)))
.withColumn("col2", when($"col2".isNull, lit(0)).otherwise($"col2"))
.show()
给定输入 dataframe
作为
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 0|
|2016-02-07 00:00:...|385608| 1| 0|
|2016-02-14 00:00:...|385608| 1| 0|
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 0|
|2016-02-07 00:00:...|105010| 1| 0|
+--------------------+------+----+----+
在应用上述逻辑后,您应该有输出数据框
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 1|
|2016-02-07 00:00:...|105010| 1| 1|
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 1|
|2016-02-07 00:00:...|385608| 1| 2|
|2016-02-14 00:00:...|385608| 1| 3|
+--------------------+------+----+----+
希望回答对你有帮助
您应该对数据框应用转换,而不是将其视为 var
。获得所需内容的一种方法是使用 Window 的 rowsBetween
对每个 window 分区内的行通过前一行(即行 -1
):
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("id").orderBy("dates").rowsBetween(Long.MinValue, -1)
val newDF = someDf.
withColumn(
"col2", sum($"col1").over(window)
).withColumn(
"col2", when($"col2".isNull, 0).otherwise($"col2")
).orderBy("id", "dates")
newDF.show
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 1|
|2016-02-07 00:00:...|105010| 1| 1|
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 1|
|2016-02-07 00:00:...|385608| 1| 2|
|2016-02-14 00:00:...|385608| 1| 3|
+--------------------+------+----+----+