比较连续的行并提取spark中的单词(不包括子集)

Compare consecutive rows and extract words(excluding the subsets) in spark

我正在处理 spark 数据框。输入数据框如下所示 (Table 1)。我需要编写一个逻辑来获取每个会话 ID 的最大长度的关键字。有多个关键字将成为每个 sessionid 输出的一部分。预期输出看起来像 Table 2.

输入数据帧:

(Table 1)
|-----------+------------+-----------------------------------|
| session_id| value      |  Timestamp                        |
|-----------+------------+-----------------------------------|
|     1     | cat        | 2021-01-11T13:48:54.2514887-05:00 |
|     1     | catc       | 2021-01-11T13:48:54.3514887-05:00 |
|     1     | catch      | 2021-01-11T13:48:54.4514887-05:00 |
|     1     | par        | 2021-01-11T13:48:55.2514887-05:00 |
|     1     | part       | 2021-01-11T13:48:56.5514887-05:00 |
|     1     | party      | 2021-01-11T13:48:57.7514887-05:00 |
|     1     | partyy     | 2021-01-11T13:48:58.7514887-05:00 |
|     2     | fal        | 2021-01-11T13:49:54.2514887-05:00 |
|     2     | fall       | 2021-01-11T13:49:54.3514887-05:00 |
|     2     | falle      | 2021-01-11T13:49:54.4514887-05:00 |
|     2     | fallen     | 2021-01-11T13:49:54.8514887-05:00 |
|     2     | Tem        | 2021-01-11T13:49:56.5514887-05:00 |
|     2     | Temp       | 2021-01-11T13:49:56.7514887-05:00 |
|-----------+------------+-----------------------------------|

预期输出:

 (Table 2)
|-----------+------------+
| session_id| value      |
|-----------+------------+
|     1     | catch      |
|     1     | partyy     |
|     2     | fallen     |
|     2     | Temp       |
|-----------+------------|

我试过的解决方案:

我添加了另一个名为 col_length 的列,它捕获值列中每个单词的长度。稍后尝试将每一行与其后续行进行比较,以查看它是否具有最大长度。但此解决方案仅适用于派对。

val df = spark.read.parquet("/project/project_name/abc")

val dfM = df.select($"session_id",$"value",$"Timestamp").withColumn("col_length",length($"value"))

val ts = Window
        .orderBy("session_id")
        .rangeBetween(Window.unboundedPreceding, Window.currentRow)

val result = dfM
            .withColumn("running_max", max("col_length") over ts)
            .where($"running_max" === $"col_length")
            .select("session_id", "value", "Timestamp")

当前输出:

|-----------+------------+
| session_id| value      |
|-----------+------------+
|     1     | catch      |
|     2     | fallen     |
|-----------+------------|

多列在带有 window 函数的 orderBy 子句中不起作用,所以我没有得到想要的 output.I 每个节 id 有 1 个输出。任何建议将不胜感激。提前致谢。

你可以用lead函数解决:

val windowSpec = Window.orderBy("session_id")
dfM
  .withColumn("lead",lead("value",1).over(windowSpec))
  .filter((functions.length(col("lead")) < functions.length(col("value"))) || col("lead").isNull)
  .drop("lead")
  .show