如何将条件应用于 PySpark 中的 groupby 数据框
How to apply conditions to groupby dataframe in PySpark
我有这样一个数据框:
ID Transaction_time Status final_time
1 1981-01-12 hit
1 1981-01-13 hit
1 1981-01-14 good 1981-01-15
1 1981-01-15 OK 1981-01-16
2 1981-01-06 good 1981-01-17
3 1981-01-07 hit 1981-01-16
4 1981-01-06 hit
4 1981-01-07 good
4 1981-01-08 good 1981-01-10
我想保留 ID
如果:
Status
有“成功”和“好”/“好”
- 最后
Transaction_time
的final_time
不为空
那么,我想提取:
id
- ID
status
- 最后一个Transaction_time
start_time
- 当 Status
从“命中”变为“好”时的 Transaction_time
finish_time
- 最后的 final_time
Transaction_time
对于上面的例子,它将是:
id status start_time finish_time
1 OK 1981-01-14 1981-01-16
4 good 1981-01-07 1981-01-10
如何在 PySpark 中实现?
您的开始时间是状态“良好”时。可以创建一个列以仅获取状态良好的日期并将其分组。尝试了我的方法,希望这对您有所帮助。
from pyspark.sql import functions as f
df.show()
+---+----------------+------+----------+
| ID|Transaction_time|Status|final_time|
+---+----------------+------+----------+
| 1| 1981-01-12| hit| null|
| 1| 1981-01-13| hit| null|
| 1| 1981-01-14| good|1981-01-15|
| 1| 1981-01-15| OK|1981-01-16|
| 2| 1981-01-06| good|1981-01-17|
| 3| 1981-01-07| hit|1981-01-16|
| 4| 1981-01-06| hit| null|
| 4| 1981-01-07| good| null|
| 4| 1981-01-08| good|1981-01-10|
+---+----------------+------+----------+
df = df.withColumn('trans_time',f.when(f.col('Status') == 'good',f.col('Transaction_time')).otherwise(None))
+---+----------------+------+----------+----------+
| ID|Transaction_time|Status|final_time|trans_time|
+---+----------------+------+----------+----------+
| 1| 1981-01-12| hit| null| null|
| 1| 1981-01-13| hit| null| null|
| 1| 1981-01-14| good|1981-01-15|1981-01-14|
| 1| 1981-01-15| OK|1981-01-16| null|
| 2| 1981-01-06| good|1981-01-17|1981-01-06|
| 3| 1981-01-07| hit|1981-01-16| null|
| 4| 1981-01-06| hit| null| null|
| 4| 1981-01-07| good| null|1981-01-07|
| 4| 1981-01-08| good|1981-01-10|1981-01-08|
+---+----------------+------+----------+----------+
cnd1 = f.when((f.max('Status') == 'hit') & (f.min('Status').isin(['OK','good'])),f.first('trans_time',ignorenulls=True))
cnd2 = f.when((f.max('Status') == 'hit') & (f.min('Status').isin(['OK','good'])),f.last('final_time',ignorenulls=True))
df.groupby('id').agg(cnd1.name("start_time"),f.min('Status').name('status'),cnd2.name('finish_time')).dropna().show()
+---+----------+------+-----------+
| id|start_time|status|finish_time|
+---+----------+------+-----------+
| 1|1981-01-14| OK| 1981-01-16|
| 4|1981-01-08| good| 1981-01-10|
+---+----------+------+-----------+
我主要使用 window 函数而不是 groupby:
w1 = Window.partitionBy('ID').orderBy(F.col('Transaction_time').desc())
w2 = Window.partitionBy('ID').orderBy(F.col('final_time').desc())
df2 = df1.withColumn('next_st', F.lag('Status', 1).over(w1)) \
.withColumn('next_tt', F.lag('Transaction_time', 1).over(w1)) \
.withColumn('max_tt', F.max('Transaction_time').over(w1)) \
.withColumn('max_ft', F.max('final_time').over(w2))
df3 = df2.join(df2.filter((F.col('Transaction_time') == F.col('max_tt')) & F.col('final_time').isNotNull()), 'ID', 'leftsemi')
df4 = df3.filter((F.col('Status') == 'hit') & F.col('next_st').isin(['good', 'OK']))
df5 = (
df4.alias('df4')
.join(df1.alias('df1'), (df1.ID == df4.ID) & (F.col('df1.final_time') == F.col('df4.max_ft')))
.select(
F.col('df4.ID').alias('id'),
F.col('df1.Status').alias('status'),
F.col('df4.next_tt').alias('start_time'),
F.col('df4.max_ft').alias('finish_time')
)
)
df5.show()
# +---+------+----------+-----------+
# | id|status|start_time|finish_time|
# +---+------+----------+-----------+
# | 4| good|1981-01-07| 1981-01-10|
# | 1| OK|1981-01-14| 1981-01-16|
# +---+------+----------+-----------+
进口:
from pyspark.sql import functions as F, Window
原始数据集:
data = [
(1, '1981-01-12', 'hit', None),
(1, '1981-01-13', 'hit', None),
(1, '1981-01-14', 'good', '1981-01-15'),
(1, '1981-01-15', 'OK', '1981-01-16'),
(2, '1981-01-06', 'good', '1981-01-17'),
(3, '1981-01-07', 'hit', '1981-01-16'),
(4, '1981-01-06', 'hit', None),
(4, '1981-01-07', 'good', None),
(4, '1981-01-08', 'good', '1981-01-10')]
df1 = spark.createDataFrame(data, ['ID', 'Transaction_time', 'Status', 'final_time'])
df1 = df1.withColumn('Transaction_time', F.col('Transaction_time').cast('date')) \
.withColumn('final_time', F.col('final_time').cast('date'))
df1.show()
# +---+----------------+------+----------+
# | ID|Transaction_time|Status|final_time|
# +---+----------------+------+----------+
# | 1| 1981-01-12| hit| null|
# | 1| 1981-01-13| hit| null|
# | 1| 1981-01-14| good|1981-01-15|
# | 1| 1981-01-15| OK|1981-01-16|
# | 2| 1981-01-06| good|1981-01-17|
# | 3| 1981-01-07| hit|1981-01-16|
# | 4| 1981-01-06| hit| null|
# | 4| 1981-01-07| good| null|
# | 4| 1981-01-08| good|1981-01-10|
# +---+----------------+------+----------+
中级dfs:
df1
+---+----------------+------+----------+
| ID|Transaction_time|Status|final_time|
+---+----------------+------+----------+
| 1| 1981-01-12| hit| null|
| 1| 1981-01-13| hit| null|
| 1| 1981-01-14| good|1981-01-15|
| 1| 1981-01-15| OK|1981-01-16|
| 2| 1981-01-06| good|1981-01-17|
| 3| 1981-01-07| hit|1981-01-16|
| 4| 1981-01-06| hit| null|
| 4| 1981-01-07| good| null|
| 4| 1981-01-08| good|1981-01-10|
+---+----------------+------+----------+
df2
+---+----------------+------+----------+-------+----------+----------+----------+
| ID|Transaction_time|Status|final_time|next_st| next_tt| max_tt| max_ft|
+---+----------------+------+----------+-------+----------+----------+----------+
| 1| 1981-01-15| OK|1981-01-16| null| null|1981-01-15|1981-01-16|
| 1| 1981-01-14| good|1981-01-15| OK|1981-01-15|1981-01-15|1981-01-16|
| 1| 1981-01-13| hit| null| good|1981-01-14|1981-01-15|1981-01-16|
| 1| 1981-01-12| hit| null| hit|1981-01-13|1981-01-15|1981-01-16|
| 3| 1981-01-07| hit|1981-01-16| null| null|1981-01-07|1981-01-16|
| 2| 1981-01-06| good|1981-01-17| null| null|1981-01-06|1981-01-17|
| 4| 1981-01-08| good|1981-01-10| null| null|1981-01-08|1981-01-10|
| 4| 1981-01-07| good| null| good|1981-01-08|1981-01-08|1981-01-10|
| 4| 1981-01-06| hit| null| good|1981-01-07|1981-01-08|1981-01-10|
+---+----------------+------+----------+-------+----------+----------+----------+
df3
+---+----------------+------+----------+-------+----------+----------+----------+
| ID|Transaction_time|Status|final_time|next_st| next_tt| max_tt| max_ft|
+---+----------------+------+----------+-------+----------+----------+----------+
| 1| 1981-01-15| OK|1981-01-16| null| null|1981-01-15|1981-01-16|
| 1| 1981-01-14| good|1981-01-15| OK|1981-01-15|1981-01-15|1981-01-16|
| 1| 1981-01-13| hit| null| good|1981-01-14|1981-01-15|1981-01-16|
| 1| 1981-01-12| hit| null| hit|1981-01-13|1981-01-15|1981-01-16|
| 3| 1981-01-07| hit|1981-01-16| null| null|1981-01-07|1981-01-16|
| 2| 1981-01-06| good|1981-01-17| null| null|1981-01-06|1981-01-17|
| 4| 1981-01-08| good|1981-01-10| null| null|1981-01-08|1981-01-10|
| 4| 1981-01-07| good| null| good|1981-01-08|1981-01-08|1981-01-10|
| 4| 1981-01-06| hit| null| good|1981-01-07|1981-01-08|1981-01-10|
+---+----------------+------+----------+-------+----------+----------+----------+
df4
+---+----------------+------+----------+-------+----------+----------+----------+
| ID|Transaction_time|Status|final_time|next_st| next_tt| max_tt| max_ft|
+---+----------------+------+----------+-------+----------+----------+----------+
| 1| 1981-01-13| hit| null| good|1981-01-14|1981-01-15|1981-01-16|
| 4| 1981-01-06| hit| null| good|1981-01-07|1981-01-08|1981-01-10|
+---+----------------+------+----------+-------+----------+----------+----------+
df5
+---+------+----------+-----------+
| id|status|start_time|finish_time|
+---+------+----------+-----------+
| 4| good|1981-01-07| 1981-01-10|
| 1| OK|1981-01-14| 1981-01-16|
+---+------+----------+-----------+
我有这样一个数据框:
ID Transaction_time Status final_time
1 1981-01-12 hit
1 1981-01-13 hit
1 1981-01-14 good 1981-01-15
1 1981-01-15 OK 1981-01-16
2 1981-01-06 good 1981-01-17
3 1981-01-07 hit 1981-01-16
4 1981-01-06 hit
4 1981-01-07 good
4 1981-01-08 good 1981-01-10
我想保留 ID
如果:
Status
有“成功”和“好”/“好”- 最后
Transaction_time
的final_time
不为空
那么,我想提取:
id
-ID
status
- 最后一个Transaction_time
start_time
- 当Status
从“命中”变为“好”时的Transaction_time
finish_time
- 最后的final_time
Transaction_time
对于上面的例子,它将是:
id status start_time finish_time
1 OK 1981-01-14 1981-01-16
4 good 1981-01-07 1981-01-10
如何在 PySpark 中实现?
您的开始时间是状态“良好”时。可以创建一个列以仅获取状态良好的日期并将其分组。尝试了我的方法,希望这对您有所帮助。
from pyspark.sql import functions as f
df.show()
+---+----------------+------+----------+
| ID|Transaction_time|Status|final_time|
+---+----------------+------+----------+
| 1| 1981-01-12| hit| null|
| 1| 1981-01-13| hit| null|
| 1| 1981-01-14| good|1981-01-15|
| 1| 1981-01-15| OK|1981-01-16|
| 2| 1981-01-06| good|1981-01-17|
| 3| 1981-01-07| hit|1981-01-16|
| 4| 1981-01-06| hit| null|
| 4| 1981-01-07| good| null|
| 4| 1981-01-08| good|1981-01-10|
+---+----------------+------+----------+
df = df.withColumn('trans_time',f.when(f.col('Status') == 'good',f.col('Transaction_time')).otherwise(None))
+---+----------------+------+----------+----------+
| ID|Transaction_time|Status|final_time|trans_time|
+---+----------------+------+----------+----------+
| 1| 1981-01-12| hit| null| null|
| 1| 1981-01-13| hit| null| null|
| 1| 1981-01-14| good|1981-01-15|1981-01-14|
| 1| 1981-01-15| OK|1981-01-16| null|
| 2| 1981-01-06| good|1981-01-17|1981-01-06|
| 3| 1981-01-07| hit|1981-01-16| null|
| 4| 1981-01-06| hit| null| null|
| 4| 1981-01-07| good| null|1981-01-07|
| 4| 1981-01-08| good|1981-01-10|1981-01-08|
+---+----------------+------+----------+----------+
cnd1 = f.when((f.max('Status') == 'hit') & (f.min('Status').isin(['OK','good'])),f.first('trans_time',ignorenulls=True))
cnd2 = f.when((f.max('Status') == 'hit') & (f.min('Status').isin(['OK','good'])),f.last('final_time',ignorenulls=True))
df.groupby('id').agg(cnd1.name("start_time"),f.min('Status').name('status'),cnd2.name('finish_time')).dropna().show()
+---+----------+------+-----------+
| id|start_time|status|finish_time|
+---+----------+------+-----------+
| 1|1981-01-14| OK| 1981-01-16|
| 4|1981-01-08| good| 1981-01-10|
+---+----------+------+-----------+
我主要使用 window 函数而不是 groupby:
w1 = Window.partitionBy('ID').orderBy(F.col('Transaction_time').desc())
w2 = Window.partitionBy('ID').orderBy(F.col('final_time').desc())
df2 = df1.withColumn('next_st', F.lag('Status', 1).over(w1)) \
.withColumn('next_tt', F.lag('Transaction_time', 1).over(w1)) \
.withColumn('max_tt', F.max('Transaction_time').over(w1)) \
.withColumn('max_ft', F.max('final_time').over(w2))
df3 = df2.join(df2.filter((F.col('Transaction_time') == F.col('max_tt')) & F.col('final_time').isNotNull()), 'ID', 'leftsemi')
df4 = df3.filter((F.col('Status') == 'hit') & F.col('next_st').isin(['good', 'OK']))
df5 = (
df4.alias('df4')
.join(df1.alias('df1'), (df1.ID == df4.ID) & (F.col('df1.final_time') == F.col('df4.max_ft')))
.select(
F.col('df4.ID').alias('id'),
F.col('df1.Status').alias('status'),
F.col('df4.next_tt').alias('start_time'),
F.col('df4.max_ft').alias('finish_time')
)
)
df5.show()
# +---+------+----------+-----------+
# | id|status|start_time|finish_time|
# +---+------+----------+-----------+
# | 4| good|1981-01-07| 1981-01-10|
# | 1| OK|1981-01-14| 1981-01-16|
# +---+------+----------+-----------+
进口:
from pyspark.sql import functions as F, Window
原始数据集:
data = [
(1, '1981-01-12', 'hit', None),
(1, '1981-01-13', 'hit', None),
(1, '1981-01-14', 'good', '1981-01-15'),
(1, '1981-01-15', 'OK', '1981-01-16'),
(2, '1981-01-06', 'good', '1981-01-17'),
(3, '1981-01-07', 'hit', '1981-01-16'),
(4, '1981-01-06', 'hit', None),
(4, '1981-01-07', 'good', None),
(4, '1981-01-08', 'good', '1981-01-10')]
df1 = spark.createDataFrame(data, ['ID', 'Transaction_time', 'Status', 'final_time'])
df1 = df1.withColumn('Transaction_time', F.col('Transaction_time').cast('date')) \
.withColumn('final_time', F.col('final_time').cast('date'))
df1.show()
# +---+----------------+------+----------+
# | ID|Transaction_time|Status|final_time|
# +---+----------------+------+----------+
# | 1| 1981-01-12| hit| null|
# | 1| 1981-01-13| hit| null|
# | 1| 1981-01-14| good|1981-01-15|
# | 1| 1981-01-15| OK|1981-01-16|
# | 2| 1981-01-06| good|1981-01-17|
# | 3| 1981-01-07| hit|1981-01-16|
# | 4| 1981-01-06| hit| null|
# | 4| 1981-01-07| good| null|
# | 4| 1981-01-08| good|1981-01-10|
# +---+----------------+------+----------+
中级dfs:
df1
+---+----------------+------+----------+
| ID|Transaction_time|Status|final_time|
+---+----------------+------+----------+
| 1| 1981-01-12| hit| null|
| 1| 1981-01-13| hit| null|
| 1| 1981-01-14| good|1981-01-15|
| 1| 1981-01-15| OK|1981-01-16|
| 2| 1981-01-06| good|1981-01-17|
| 3| 1981-01-07| hit|1981-01-16|
| 4| 1981-01-06| hit| null|
| 4| 1981-01-07| good| null|
| 4| 1981-01-08| good|1981-01-10|
+---+----------------+------+----------+
df2
+---+----------------+------+----------+-------+----------+----------+----------+
| ID|Transaction_time|Status|final_time|next_st| next_tt| max_tt| max_ft|
+---+----------------+------+----------+-------+----------+----------+----------+
| 1| 1981-01-15| OK|1981-01-16| null| null|1981-01-15|1981-01-16|
| 1| 1981-01-14| good|1981-01-15| OK|1981-01-15|1981-01-15|1981-01-16|
| 1| 1981-01-13| hit| null| good|1981-01-14|1981-01-15|1981-01-16|
| 1| 1981-01-12| hit| null| hit|1981-01-13|1981-01-15|1981-01-16|
| 3| 1981-01-07| hit|1981-01-16| null| null|1981-01-07|1981-01-16|
| 2| 1981-01-06| good|1981-01-17| null| null|1981-01-06|1981-01-17|
| 4| 1981-01-08| good|1981-01-10| null| null|1981-01-08|1981-01-10|
| 4| 1981-01-07| good| null| good|1981-01-08|1981-01-08|1981-01-10|
| 4| 1981-01-06| hit| null| good|1981-01-07|1981-01-08|1981-01-10|
+---+----------------+------+----------+-------+----------+----------+----------+
df3
+---+----------------+------+----------+-------+----------+----------+----------+
| ID|Transaction_time|Status|final_time|next_st| next_tt| max_tt| max_ft|
+---+----------------+------+----------+-------+----------+----------+----------+
| 1| 1981-01-15| OK|1981-01-16| null| null|1981-01-15|1981-01-16|
| 1| 1981-01-14| good|1981-01-15| OK|1981-01-15|1981-01-15|1981-01-16|
| 1| 1981-01-13| hit| null| good|1981-01-14|1981-01-15|1981-01-16|
| 1| 1981-01-12| hit| null| hit|1981-01-13|1981-01-15|1981-01-16|
| 3| 1981-01-07| hit|1981-01-16| null| null|1981-01-07|1981-01-16|
| 2| 1981-01-06| good|1981-01-17| null| null|1981-01-06|1981-01-17|
| 4| 1981-01-08| good|1981-01-10| null| null|1981-01-08|1981-01-10|
| 4| 1981-01-07| good| null| good|1981-01-08|1981-01-08|1981-01-10|
| 4| 1981-01-06| hit| null| good|1981-01-07|1981-01-08|1981-01-10|
+---+----------------+------+----------+-------+----------+----------+----------+
df4
+---+----------------+------+----------+-------+----------+----------+----------+
| ID|Transaction_time|Status|final_time|next_st| next_tt| max_tt| max_ft|
+---+----------------+------+----------+-------+----------+----------+----------+
| 1| 1981-01-13| hit| null| good|1981-01-14|1981-01-15|1981-01-16|
| 4| 1981-01-06| hit| null| good|1981-01-07|1981-01-08|1981-01-10|
+---+----------------+------+----------+-------+----------+----------+----------+
df5
+---+------+----------+-----------+
| id|status|start_time|finish_time|
+---+------+----------+-----------+
| 4| good|1981-01-07| 1981-01-10|
| 1| OK|1981-01-14| 1981-01-16|
+---+------+----------+-----------+