如何比较多行?
How to compare multiple rows?
我想将两个连续的行 i
与 col2
的 i-1
进行比较(按 col1
排序)。
如果第 i
行的 item_i
和 item_[i-1]_row
不同,我想将 item_[i-1]
的计数加 1。
+--------------+
| col1 col2 |
+--------------+
| row_1 item_1 |
| row_2 item_1 |
| row_3 item_2 |
| row_4 item_1 |
| row_5 item_2 |
| row_6 item_1 |
+--------------+
在上面的例子中,如果我们一次向下扫描两行,我们看到 row_2
和 row_3
不同,因此我们在 item_1 上加一。接下来,我们看到 row_3
与 row_4
不同,然后在 item_2
上加一。继续,直到我们结束:
+-------------+
| col2 col3 |
+-------------+
| item_1 2 |
| item_2 2 |
+-------------+
您可以结合使用 window 函数和聚合函数来执行此操作。 window 函数用于获取 col2
的下一个值(使用 col1
进行排序)。然后聚合计算我们遇到差异的次数。这是在下面的代码中实现的:
val data = Seq(
("row_1", "item_1"),
("row_2", "item_1"),
("row_3", "item_2"),
("row_4", "item_1"),
("row_5", "item_2"),
("row_6", "item_1")).toDF("col1", "col2")
import org.apache.spark.sql.expressions.Window
val q = data.
withColumn("col2_next",
coalesce(lead($"col2", 1) over Window.orderBy($"col1"), $"col2")).
groupBy($"col2").
agg(sum($"col2" =!= $"col2_next" cast "int") as "col3")
scala> q.show
17/08/22 10:15:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+----+
| col2|col3|
+------+----+
|item_1| 2|
|item_2| 2|
+------+----+
我想将两个连续的行 i
与 col2
的 i-1
进行比较(按 col1
排序)。
如果第 i
行的 item_i
和 item_[i-1]_row
不同,我想将 item_[i-1]
的计数加 1。
+--------------+
| col1 col2 |
+--------------+
| row_1 item_1 |
| row_2 item_1 |
| row_3 item_2 |
| row_4 item_1 |
| row_5 item_2 |
| row_6 item_1 |
+--------------+
在上面的例子中,如果我们一次向下扫描两行,我们看到 row_2
和 row_3
不同,因此我们在 item_1 上加一。接下来,我们看到 row_3
与 row_4
不同,然后在 item_2
上加一。继续,直到我们结束:
+-------------+
| col2 col3 |
+-------------+
| item_1 2 |
| item_2 2 |
+-------------+
您可以结合使用 window 函数和聚合函数来执行此操作。 window 函数用于获取 col2
的下一个值(使用 col1
进行排序)。然后聚合计算我们遇到差异的次数。这是在下面的代码中实现的:
val data = Seq(
("row_1", "item_1"),
("row_2", "item_1"),
("row_3", "item_2"),
("row_4", "item_1"),
("row_5", "item_2"),
("row_6", "item_1")).toDF("col1", "col2")
import org.apache.spark.sql.expressions.Window
val q = data.
withColumn("col2_next",
coalesce(lead($"col2", 1) over Window.orderBy($"col1"), $"col2")).
groupBy($"col2").
agg(sum($"col2" =!= $"col2_next" cast "int") as "col3")
scala> q.show
17/08/22 10:15:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+----+
| col2|col3|
+------+----+
|item_1| 2|
|item_2| 2|
+------+----+