Pyspark 中同一列的递归操作
recursive operation on the same column in Pyspark
我有一个这样的数据框:
数据框:
|SEQ_ID |TIME_STAMP |_MS |
+-------+-----------------------+------------------+
|3879826|2021-07-29 11:24:20.525|NaN |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|
|3879826|2021-07-29 11:27:43.264|27.247600203353613|
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|
|3879826|2021-07-29 11:36:19.128|13.011968111650264|
|3879826|2021-07-29 11:38:10.919|17.762006254598797|
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|
当 _MS
>=3 且前一个 _MS
小于当前 _MS
我想将新列 drift_MS
增加 100。但是如果_MS
<3 and previous _MS
< current _MS
我想将 drift_MS
加 1。如果 none 的条件满足,我想设置值 0
预期输出:
|SEQ_ID |TIME_STAMP |_MS |drift_MS|
+-------+-----------------------+------------------+--------+
|3879826|2021-07-29 11:24:20.525|NaN |0 |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|0 |
|3879826|2021-07-29 11:27:43.264|27.247600203353613|100 |
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |0 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |0 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|1 |
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|2 |
|3879826|2021-07-29 11:36:19.128|13.011968111650264|102 |
|3879826|2021-07-29 11:38:10.919|17.762006254598797|202 |
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|0 |
这个问题我有一个不同的版本,我只是想保持以前的值不变,一个非常有帮助的贡献者建议我像这样使用求和函数;
import pyspark.sql.functions as f
w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_MS = (f.lag(col('_MS'),1).over(w1))
df.withColumn('drift_MS',
f.sum(
when((col('_MS') < 3) & (prev_MS < col('_MS')), 1)
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), 100)
.otherwise(0)
).over(w1))
如果 none 的条件得到满足,我希望之前的 drift_MS
值保持不变,这非常有效。但是,如果不满足条件,我现在需要将其重置为零。
我试图弄清楚,但我一直在碰壁,我需要迭代循环回到前一行,这通常不会在 pyspark 或大数据中完成,因为它对列操作最有效
以下代码对我不起作用:
import pyspark.sql.functions as f
w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_drift_MS_temp = (f.lag(col('drift_MS_temp'),1).over(w1))
prev_drift_MS = (f.lag(col('drift_MS'),1).over(w1))
prev_MS = (f.lag(col('_MS'),1).over(w1))
df.withColumn('drift_MS_temp',
f.sum(
when((col('_MS') < 3) & (prev_MS < col('_MS')), 1)
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), 100)
.otherwise(0)
).over(w1))\
.withColumn('drift_MS',when(prev_drift_MS_temp==col('drift_MS_temp'),0)
.otherwise(col('drift_MS_temp') - prev_drift_MS_temp + prev_drift_MS))
有什么想法可以解决这个问题吗?
更新:
因此,在为此苦思冥想之后,到目前为止我想到的最好的逻辑是从 drift_MS
创建一个不同的列,然后在差异列不是 0
时有一个条件累积和
所以像这样:
|SEQ_ID |TIME_STAMP |_MS |drift_MS|_diff |drift |
+-------+-----------------------+------------------+--------+--------+--------+
|3879826|2021-07-29 11:24:20.525|NaN |0 |0 |0 |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|0 |0 |0 |
|3879826|2021-07-29 11:27:43.264|27.247600203353613|100 |100 |100 |
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |100 |0 |0 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |100 |0 |0 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|101 |1 |1 |
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|102 |1 |1 |
|3879826|2021-07-29 11:36:19.128|13.011968111650264|202 |100 |102 |
|3879826|2021-07-29 11:38:10.919|17.762006254598797|302 |100 |202 |
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|302 |0 |0 |
我设想的伪代码如下所示:
import pyspark.sql.functions as f
w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_drift_MS = (f.lag(col('drift_MS'),1).over(w1))
prev_diff= (f.lag(col('_diff'),1).over(w1))
prev_MS = (f.lag(col('_MS'),1).over(w1))
df.withColumn('drift_MS',
f.sum(
when((col('_MS') < 3) & (prev_MS < col('_MS')), 1)
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), 100)
.otherwise(0)
).over(w1))\
.withColumn('_diff', prev_drift_MS - col('drift_MS'))\
.withColumn('drift', when(prev_diff==0, 0).otherwise(f.sum(col('drift')).over(w1)))
以这种方式获取它的正确语法是什么?
我们可以使用的一个选项是在获得最终的 drift_MS
列之前创建一堆辅助列。让我们逐步尝试。
- 通过应用您定义的增量条件创建列
x
。
- 创建列
y
作为标志,其中列 x
中的值重置为零。
- 创建列
z
以将标志之间的行组合在一起。我们可以在当前行和无限后续行之间的行内使用累积和。
- 最终创建列
drift_MS
作为按 SEQ_ID
分组的行和按 TIME_STAMP
. 排序的辅助列 z
的累积总和
将这些步骤放入代码中会像这样(在 SQL 表达式中更容易阅读)
import pyspark.sql.functions as F
expr_x = F.expr("""
case
when _MS >= 3 AND lag(_MS) over (partition by SEQ_ID order by TIME_STAMP) < _MS then 100
when _MS < 3 AND lag(_MS) over (partition by SEQ_ID order by TIME_STAMP) < _MS then 1
else 0 end """)
expr_y = F.expr("""
case
when x <> 0 and lead(x) over (partition by SEQ_ID order by TIME_STAMP) = 0 then 1
else null end """)
expr_z = F.expr("""
sum(y) over(partition by SEQ_ID
order by TIME_STAMP
rows between 0 preceding and unbounded following) """)
expr_drift = F.expr("""
sum(x) over (partition by SEQ_ID, z
order by TIME_STAMP
rows between unbounded preceding and 0 following) """)
df = (df
.withColumn('x', expr_x)
.withColumn('y', expr_y)
.withColumn('z', expr_z)
.withColumn("drift_MS", expr_drift))
df.show()
# +-------+--------------------+------------------+---+----+----+--------+
# | SEQ_ID| TIME_STAMP| _MS| x| y| z|drift_MS|
# +-------+--------------------+------------------+---+----+----+--------+
# |3879826|2021-07-29 11:24:...| NaN| 0|null| 2| 0|
# |3879826|2021-07-29 11:25:...|21.262409581399556| 0|null| 2| 0|
# |3879826|2021-07-29 11:27:...|27.247600203353613|100| 1| 2| 100|
# |3879826|2021-07-29 11:29:...| 18.13528511851038| 0|null| 1| 0|
# |3879826|2021-07-29 11:31:...| 2.520896614376871| 0|null| 1| 0|
# |3879826|2021-07-29 11:32:...| 2.708193158560554| 1|null| 1| 1|
# |3879826|2021-07-29 11:34:...|2.9832290627235505| 1|null| 1| 2|
# |3879826|2021-07-29 11:36:...|13.011968111650264|100|null| 1| 102|
# |3879826|2021-07-29 11:38:...| 17.7620062545988|100| 1| 1| 202|
# |3879826|2021-07-29 11:40:...|1.9661930950977458| 0|null|null| 0|
# +-------+--------------------+------------------+---+----+----+--------+
我有一个这样的数据框:
数据框:
|SEQ_ID |TIME_STAMP |_MS |
+-------+-----------------------+------------------+
|3879826|2021-07-29 11:24:20.525|NaN |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|
|3879826|2021-07-29 11:27:43.264|27.247600203353613|
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|
|3879826|2021-07-29 11:36:19.128|13.011968111650264|
|3879826|2021-07-29 11:38:10.919|17.762006254598797|
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|
当 _MS
>=3 且前一个 _MS
小于当前 _MS
我想将新列 drift_MS
增加 100。但是如果_MS
<3 and previous _MS
< current _MS
我想将 drift_MS
加 1。如果 none 的条件满足,我想设置值 0
预期输出:
|SEQ_ID |TIME_STAMP |_MS |drift_MS|
+-------+-----------------------+------------------+--------+
|3879826|2021-07-29 11:24:20.525|NaN |0 |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|0 |
|3879826|2021-07-29 11:27:43.264|27.247600203353613|100 |
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |0 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |0 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|1 |
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|2 |
|3879826|2021-07-29 11:36:19.128|13.011968111650264|102 |
|3879826|2021-07-29 11:38:10.919|17.762006254598797|202 |
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|0 |
这个问题我有一个不同的版本,我只是想保持以前的值不变,一个非常有帮助的贡献者建议我像这样使用求和函数;
import pyspark.sql.functions as f
w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_MS = (f.lag(col('_MS'),1).over(w1))
df.withColumn('drift_MS',
f.sum(
when((col('_MS') < 3) & (prev_MS < col('_MS')), 1)
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), 100)
.otherwise(0)
).over(w1))
如果 none 的条件得到满足,我希望之前的 drift_MS
值保持不变,这非常有效。但是,如果不满足条件,我现在需要将其重置为零。
我试图弄清楚,但我一直在碰壁,我需要迭代循环回到前一行,这通常不会在 pyspark 或大数据中完成,因为它对列操作最有效
以下代码对我不起作用:
import pyspark.sql.functions as f
w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_drift_MS_temp = (f.lag(col('drift_MS_temp'),1).over(w1))
prev_drift_MS = (f.lag(col('drift_MS'),1).over(w1))
prev_MS = (f.lag(col('_MS'),1).over(w1))
df.withColumn('drift_MS_temp',
f.sum(
when((col('_MS') < 3) & (prev_MS < col('_MS')), 1)
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), 100)
.otherwise(0)
).over(w1))\
.withColumn('drift_MS',when(prev_drift_MS_temp==col('drift_MS_temp'),0)
.otherwise(col('drift_MS_temp') - prev_drift_MS_temp + prev_drift_MS))
有什么想法可以解决这个问题吗?
更新:
因此,在为此苦思冥想之后,到目前为止我想到的最好的逻辑是从 drift_MS
创建一个不同的列,然后在差异列不是 0
时有一个条件累积和
所以像这样:
|SEQ_ID |TIME_STAMP |_MS |drift_MS|_diff |drift |
+-------+-----------------------+------------------+--------+--------+--------+
|3879826|2021-07-29 11:24:20.525|NaN |0 |0 |0 |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|0 |0 |0 |
|3879826|2021-07-29 11:27:43.264|27.247600203353613|100 |100 |100 |
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |100 |0 |0 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |100 |0 |0 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|101 |1 |1 |
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|102 |1 |1 |
|3879826|2021-07-29 11:36:19.128|13.011968111650264|202 |100 |102 |
|3879826|2021-07-29 11:38:10.919|17.762006254598797|302 |100 |202 |
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|302 |0 |0 |
我设想的伪代码如下所示:
import pyspark.sql.functions as f
w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_drift_MS = (f.lag(col('drift_MS'),1).over(w1))
prev_diff= (f.lag(col('_diff'),1).over(w1))
prev_MS = (f.lag(col('_MS'),1).over(w1))
df.withColumn('drift_MS',
f.sum(
when((col('_MS') < 3) & (prev_MS < col('_MS')), 1)
.when((col('_MS') >= 3) & (prev_MS < col('_MS')), 100)
.otherwise(0)
).over(w1))\
.withColumn('_diff', prev_drift_MS - col('drift_MS'))\
.withColumn('drift', when(prev_diff==0, 0).otherwise(f.sum(col('drift')).over(w1)))
以这种方式获取它的正确语法是什么?
我们可以使用的一个选项是在获得最终的 drift_MS
列之前创建一堆辅助列。让我们逐步尝试。
- 通过应用您定义的增量条件创建列
x
。 - 创建列
y
作为标志,其中列x
中的值重置为零。 - 创建列
z
以将标志之间的行组合在一起。我们可以在当前行和无限后续行之间的行内使用累积和。 - 最终创建列
drift_MS
作为按SEQ_ID
分组的行和按TIME_STAMP
. 排序的辅助列
z
的累积总和
将这些步骤放入代码中会像这样(在 SQL 表达式中更容易阅读)
import pyspark.sql.functions as F
expr_x = F.expr("""
case
when _MS >= 3 AND lag(_MS) over (partition by SEQ_ID order by TIME_STAMP) < _MS then 100
when _MS < 3 AND lag(_MS) over (partition by SEQ_ID order by TIME_STAMP) < _MS then 1
else 0 end """)
expr_y = F.expr("""
case
when x <> 0 and lead(x) over (partition by SEQ_ID order by TIME_STAMP) = 0 then 1
else null end """)
expr_z = F.expr("""
sum(y) over(partition by SEQ_ID
order by TIME_STAMP
rows between 0 preceding and unbounded following) """)
expr_drift = F.expr("""
sum(x) over (partition by SEQ_ID, z
order by TIME_STAMP
rows between unbounded preceding and 0 following) """)
df = (df
.withColumn('x', expr_x)
.withColumn('y', expr_y)
.withColumn('z', expr_z)
.withColumn("drift_MS", expr_drift))
df.show()
# +-------+--------------------+------------------+---+----+----+--------+
# | SEQ_ID| TIME_STAMP| _MS| x| y| z|drift_MS|
# +-------+--------------------+------------------+---+----+----+--------+
# |3879826|2021-07-29 11:24:...| NaN| 0|null| 2| 0|
# |3879826|2021-07-29 11:25:...|21.262409581399556| 0|null| 2| 0|
# |3879826|2021-07-29 11:27:...|27.247600203353613|100| 1| 2| 100|
# |3879826|2021-07-29 11:29:...| 18.13528511851038| 0|null| 1| 0|
# |3879826|2021-07-29 11:31:...| 2.520896614376871| 0|null| 1| 0|
# |3879826|2021-07-29 11:32:...| 2.708193158560554| 1|null| 1| 1|
# |3879826|2021-07-29 11:34:...|2.9832290627235505| 1|null| 1| 2|
# |3879826|2021-07-29 11:36:...|13.011968111650264|100|null| 1| 102|
# |3879826|2021-07-29 11:38:...| 17.7620062545988|100| 1| 1| 202|
# |3879826|2021-07-29 11:40:...|1.9661930950977458| 0|null|null| 0|
# +-------+--------------------+------------------+---+----+----+--------+