如何将当前行的值与下一个相除?
How to divide the value of current row with the following one?
在 Spark-Sql 1.6 版本中,使用 DataFrame
s,有没有办法计算,对于特定列,当前行和下一行的分数,对于每一行?
例如,如果我有一个只有一列的 table,就像这样
Age
100
50
20
4
我想要以下输出
Franction
2
2.5
5
最后一行被删除,因为没有要添加的 "next row"。
现在我正在通过对 table 进行排名并将其与自身合并来做到这一点,其中 rank
等于 rank+1
。
有更好的方法吗?
这可以用 Window
函数来完成吗?
Window
函数应该只做部分技巧。可以通过定义 udf
函数
来完成其他部分技巧
def div = udf((age: Double, lag: Double) => lag/age)
首先我们需要使用 Window
函数找到 lag
然后将 lag
和 age
传递给 udf
函数来找到 div
导入 sqlContext.implicits._
导入 org.apache.spark.sql.functions._
val dataframe = Seq(
("A",100),
("A",50),
("A",20),
("A",4)
).toDF("person", "Age")
val windowSpec = Window.partitionBy("person").orderBy(col("Age").desc)
val newDF = dataframe.withColumn("lag", lag(dataframe("Age"), 1) over(windowSpec))
最后调用 udf 函数
newDF.filter(newDF("lag").isNotNull).withColumn("div", div(newDF("Age"), newDF("lag"))).drop("Age", "lag").show
最终输出将是
+------+---+
|person|div|
+------+---+
| A|2.0|
| A|2.5|
| A|5.0|
+------+---+
已编辑
由于@Jacek 提出了一个更好的解决方案,即使用 .na.drop
而不是 .filter(newDF("lag").isNotNull)
并使用 /
运算符,因此我们甚至不需要调用 udf
函数
newDF.na.drop.withColumn("div", newDF("lag")/newDF("Age")).drop("Age", "lag").show
在 Spark-Sql 1.6 版本中,使用 DataFrame
s,有没有办法计算,对于特定列,当前行和下一行的分数,对于每一行?
例如,如果我有一个只有一列的 table,就像这样
Age
100
50
20
4
我想要以下输出
Franction
2
2.5
5
最后一行被删除,因为没有要添加的 "next row"。
现在我正在通过对 table 进行排名并将其与自身合并来做到这一点,其中 rank
等于 rank+1
。
有更好的方法吗?
这可以用 Window
函数来完成吗?
Window
函数应该只做部分技巧。可以通过定义 udf
函数
def div = udf((age: Double, lag: Double) => lag/age)
首先我们需要使用 Window
函数找到 lag
然后将 lag
和 age
传递给 udf
函数来找到 div
导入 sqlContext.implicits._
导入 org.apache.spark.sql.functions._
val dataframe = Seq(
("A",100),
("A",50),
("A",20),
("A",4)
).toDF("person", "Age")
val windowSpec = Window.partitionBy("person").orderBy(col("Age").desc)
val newDF = dataframe.withColumn("lag", lag(dataframe("Age"), 1) over(windowSpec))
最后调用 udf 函数
newDF.filter(newDF("lag").isNotNull).withColumn("div", div(newDF("Age"), newDF("lag"))).drop("Age", "lag").show
最终输出将是
+------+---+
|person|div|
+------+---+
| A|2.0|
| A|2.5|
| A|5.0|
+------+---+
已编辑
由于@Jacek 提出了一个更好的解决方案,即使用 .na.drop
而不是 .filter(newDF("lag").isNotNull)
并使用 /
运算符,因此我们甚至不需要调用 udf
函数
newDF.na.drop.withColumn("div", newDF("lag")/newDF("Age")).drop("Age", "lag").show