比较连续的行并提取spark中的单词(不包括子集)
Compare consecutive rows and extract words(excluding the subsets) in spark
我正在处理 spark 数据框。输入数据框如下所示 (Table 1)。我需要编写一个逻辑来获取每个会话 ID 的最大长度的关键字。有多个关键字将成为每个 sessionid 输出的一部分。预期输出看起来像 Table 2.
输入数据帧:
(Table 1)
|-----------+------------+-----------------------------------|
| session_id| value | Timestamp |
|-----------+------------+-----------------------------------|
| 1 | cat | 2021-01-11T13:48:54.2514887-05:00 |
| 1 | catc | 2021-01-11T13:48:54.3514887-05:00 |
| 1 | catch | 2021-01-11T13:48:54.4514887-05:00 |
| 1 | par | 2021-01-11T13:48:55.2514887-05:00 |
| 1 | part | 2021-01-11T13:48:56.5514887-05:00 |
| 1 | party | 2021-01-11T13:48:57.7514887-05:00 |
| 1 | partyy | 2021-01-11T13:48:58.7514887-05:00 |
| 2 | fal | 2021-01-11T13:49:54.2514887-05:00 |
| 2 | fall | 2021-01-11T13:49:54.3514887-05:00 |
| 2 | falle | 2021-01-11T13:49:54.4514887-05:00 |
| 2 | fallen | 2021-01-11T13:49:54.8514887-05:00 |
| 2 | Tem | 2021-01-11T13:49:56.5514887-05:00 |
| 2 | Temp | 2021-01-11T13:49:56.7514887-05:00 |
|-----------+------------+-----------------------------------|
预期输出:
(Table 2)
|-----------+------------+
| session_id| value |
|-----------+------------+
| 1 | catch |
| 1 | partyy |
| 2 | fallen |
| 2 | Temp |
|-----------+------------|
我试过的解决方案:
我添加了另一个名为 col_length 的列,它捕获值列中每个单词的长度。稍后尝试将每一行与其后续行进行比较,以查看它是否具有最大长度。但此解决方案仅适用于派对。
val df = spark.read.parquet("/project/project_name/abc")
val dfM = df.select($"session_id",$"value",$"Timestamp").withColumn("col_length",length($"value"))
val ts = Window
.orderBy("session_id")
.rangeBetween(Window.unboundedPreceding, Window.currentRow)
val result = dfM
.withColumn("running_max", max("col_length") over ts)
.where($"running_max" === $"col_length")
.select("session_id", "value", "Timestamp")
当前输出:
|-----------+------------+
| session_id| value |
|-----------+------------+
| 1 | catch |
| 2 | fallen |
|-----------+------------|
多列在带有 window 函数的 orderBy 子句中不起作用,所以我没有得到想要的 output.I 每个节 id 有 1 个输出。任何建议将不胜感激。提前致谢。
你可以用lead
函数解决:
val windowSpec = Window.orderBy("session_id")
dfM
.withColumn("lead",lead("value",1).over(windowSpec))
.filter((functions.length(col("lead")) < functions.length(col("value"))) || col("lead").isNull)
.drop("lead")
.show
我正在处理 spark 数据框。输入数据框如下所示 (Table 1)。我需要编写一个逻辑来获取每个会话 ID 的最大长度的关键字。有多个关键字将成为每个 sessionid 输出的一部分。预期输出看起来像 Table 2.
输入数据帧:
(Table 1)
|-----------+------------+-----------------------------------|
| session_id| value | Timestamp |
|-----------+------------+-----------------------------------|
| 1 | cat | 2021-01-11T13:48:54.2514887-05:00 |
| 1 | catc | 2021-01-11T13:48:54.3514887-05:00 |
| 1 | catch | 2021-01-11T13:48:54.4514887-05:00 |
| 1 | par | 2021-01-11T13:48:55.2514887-05:00 |
| 1 | part | 2021-01-11T13:48:56.5514887-05:00 |
| 1 | party | 2021-01-11T13:48:57.7514887-05:00 |
| 1 | partyy | 2021-01-11T13:48:58.7514887-05:00 |
| 2 | fal | 2021-01-11T13:49:54.2514887-05:00 |
| 2 | fall | 2021-01-11T13:49:54.3514887-05:00 |
| 2 | falle | 2021-01-11T13:49:54.4514887-05:00 |
| 2 | fallen | 2021-01-11T13:49:54.8514887-05:00 |
| 2 | Tem | 2021-01-11T13:49:56.5514887-05:00 |
| 2 | Temp | 2021-01-11T13:49:56.7514887-05:00 |
|-----------+------------+-----------------------------------|
预期输出:
(Table 2)
|-----------+------------+
| session_id| value |
|-----------+------------+
| 1 | catch |
| 1 | partyy |
| 2 | fallen |
| 2 | Temp |
|-----------+------------|
我试过的解决方案:
我添加了另一个名为 col_length 的列,它捕获值列中每个单词的长度。稍后尝试将每一行与其后续行进行比较,以查看它是否具有最大长度。但此解决方案仅适用于派对。
val df = spark.read.parquet("/project/project_name/abc")
val dfM = df.select($"session_id",$"value",$"Timestamp").withColumn("col_length",length($"value"))
val ts = Window
.orderBy("session_id")
.rangeBetween(Window.unboundedPreceding, Window.currentRow)
val result = dfM
.withColumn("running_max", max("col_length") over ts)
.where($"running_max" === $"col_length")
.select("session_id", "value", "Timestamp")
当前输出:
|-----------+------------+
| session_id| value |
|-----------+------------+
| 1 | catch |
| 2 | fallen |
|-----------+------------|
多列在带有 window 函数的 orderBy 子句中不起作用,所以我没有得到想要的 output.I 每个节 id 有 1 个输出。任何建议将不胜感激。提前致谢。
你可以用lead
函数解决:
val windowSpec = Window.orderBy("session_id")
dfM
.withColumn("lead",lead("value",1).over(windowSpec))
.filter((functions.length(col("lead")) < functions.length(col("value"))) || col("lead").isNull)
.drop("lead")
.show