PySpark - 如何根据列中的两个值从数据框中过滤出连续的行块
PySpark - How to filter a consecutive chunk of rows out of a dataframe based on two values in a column
我有一个数据框,我想使用 pyspark
基于某些列值创建另一个数据框。
例如:下面是我的主要数据框 -
Part1 Part2 Part3 Part4
aaa up 24 k-123
bbb down 45 i-98
ccc down 54 k-89
fff int 23 l-34
xyz up 22 o-89
www up 89 u-56
现在,我想创建另一个数据框,它将搜索第一次出现的 'down',直到第一次出现 'up'。因此,预期的数据帧将是:
Part1 Part2 Part3 Part4
bbb down 45 i-98
ccc down 54 k-89
fff int 23 l-34
xyz up 22 o-89
步骤 1: 创建 DataFrame
。
from pyspark.sql.functions import when, col, lit
df = spark.createDataFrame(
[('aaa','up',24,'k-123'),('bbb','down',45,'i-98'),('ccc','down',54,'k-89'),
('fff','int', 23,'l-34'),('xyz','up',22,'o-89'),('www','up',89,'u-56')],
schema = ['Part1','Part2','Part3','Part4']
)
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
| aaa| up| 24|k-123|
| bbb| down| 45| i-98|
| ccc| down| 54| k-89|
| fff| int| 23| l-34|
| xyz| up| 22| o-89|
| www| up| 89| u-56|
+-----+-----+-----+-----+
第 2 步: 首先我们需要找到第一个出现的 down
并删除它上面的所有行。为此,我们创建一个列 cumulative
,如果 Part2
== down
则值为 1,否则为 0,最后对该列进行累加和。
df = df.withColumn('Dummy',lit('dummy'))
df = df.withColumn('cumulative',when(col('Part2')=='down',1).otherwise(0))
df = df.selectExpr(
'Part1','Part2','Part3','Part4','Dummy',
'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
)
df.show()
+-----+-----+-----+-----+-----+----------+
|Part1|Part2|Part3|Part4|Dummy|cumulative|
+-----+-----+-----+-----+-----+----------+
| aaa| up| 24|k-123|dummy| 0|
| bbb| down| 45| i-98|dummy| 1|
| ccc| down| 54| k-89|dummy| 2|
| fff| int| 23| l-34|dummy| 2|
| xyz| up| 22| o-89|dummy| 2|
| www| up| 89| u-56|dummy| 2|
+-----+-----+-----+-----+-----+----------+
现在,删除累计和为 0 的所有行。这将删除所有行,直到 down
第一次出现。
df = df.where(col('cumulative')>=1)
第 3 步: 执行与上面第 2 步相同的操作,除了对 up
执行此操作并删除列 cumulative
中值的所有行小于或等于 1。这样我们将删除第一次出现 up
.
以下的所有行
df = df.withColumn('cumulative',when(col('Part2')=='up',1).otherwise(0))
df = df.selectExpr(
'Part1','Part2','Part3','Part4','Dummy',
'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
)
df = df.where(col('cumulative')<=1).drop('Dummy','cumulative')
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
| bbb| down| 45| i-98|
| ccc| down| 54| k-89|
| fff| int| 23| l-34|
| xyz| up| 22| o-89|
+-----+-----+-----+-----+
我有一个数据框,我想使用 pyspark
基于某些列值创建另一个数据框。
例如:下面是我的主要数据框 -
Part1 Part2 Part3 Part4
aaa up 24 k-123
bbb down 45 i-98
ccc down 54 k-89
fff int 23 l-34
xyz up 22 o-89
www up 89 u-56
现在,我想创建另一个数据框,它将搜索第一次出现的 'down',直到第一次出现 'up'。因此,预期的数据帧将是:
Part1 Part2 Part3 Part4
bbb down 45 i-98
ccc down 54 k-89
fff int 23 l-34
xyz up 22 o-89
步骤 1: 创建 DataFrame
。
from pyspark.sql.functions import when, col, lit
df = spark.createDataFrame(
[('aaa','up',24,'k-123'),('bbb','down',45,'i-98'),('ccc','down',54,'k-89'),
('fff','int', 23,'l-34'),('xyz','up',22,'o-89'),('www','up',89,'u-56')],
schema = ['Part1','Part2','Part3','Part4']
)
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
| aaa| up| 24|k-123|
| bbb| down| 45| i-98|
| ccc| down| 54| k-89|
| fff| int| 23| l-34|
| xyz| up| 22| o-89|
| www| up| 89| u-56|
+-----+-----+-----+-----+
第 2 步: 首先我们需要找到第一个出现的 down
并删除它上面的所有行。为此,我们创建一个列 cumulative
,如果 Part2
== down
则值为 1,否则为 0,最后对该列进行累加和。
df = df.withColumn('Dummy',lit('dummy'))
df = df.withColumn('cumulative',when(col('Part2')=='down',1).otherwise(0))
df = df.selectExpr(
'Part1','Part2','Part3','Part4','Dummy',
'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
)
df.show()
+-----+-----+-----+-----+-----+----------+
|Part1|Part2|Part3|Part4|Dummy|cumulative|
+-----+-----+-----+-----+-----+----------+
| aaa| up| 24|k-123|dummy| 0|
| bbb| down| 45| i-98|dummy| 1|
| ccc| down| 54| k-89|dummy| 2|
| fff| int| 23| l-34|dummy| 2|
| xyz| up| 22| o-89|dummy| 2|
| www| up| 89| u-56|dummy| 2|
+-----+-----+-----+-----+-----+----------+
现在,删除累计和为 0 的所有行。这将删除所有行,直到 down
第一次出现。
df = df.where(col('cumulative')>=1)
第 3 步: 执行与上面第 2 步相同的操作,除了对 up
执行此操作并删除列 cumulative
中值的所有行小于或等于 1。这样我们将删除第一次出现 up
.
df = df.withColumn('cumulative',when(col('Part2')=='up',1).otherwise(0))
df = df.selectExpr(
'Part1','Part2','Part3','Part4','Dummy',
'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
)
df = df.where(col('cumulative')<=1).drop('Dummy','cumulative')
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
| bbb| down| 45| i-98|
| ccc| down| 54| k-89|
| fff| int| 23| l-34|
| xyz| up| 22| o-89|
+-----+-----+-----+-----+