PySpark - "compressing" 将多行客户合并为一行,删除空白
PySpark - "compressing" multiple-row customers into one row, deleting blanks
所以我目前有一个如下所示的数据框:
+-------------+----------------+---------------+------------------+-----------------+
| customer_id | init_base_date | init_end_date | reinit_base_date | reinit_end_date |
+-------------+----------------+---------------+------------------+-----------------+
| ... | | | | |
| A | 2015-07-30 | | | |
| A | | 2016-07-24 | | |
| B | 2015-07-10 | | | |
| B | | 2015-10-05 | | |
| B | | | 2016-01-09 | |
| B | | | | 2016-07-04 |
| C | 2015-05-13 | | | |
| C | | 2015-08-09 | | |
| ... | | | | |
+-------------+----------------+---------------+------------------+-----------------+
而且我确实需要将其转换为以下形式:
+-------------+----------------+---------------+------------------+-----------------+
| customer_id | init_base_date | init_end_date | reinit_base_date | reinit_end_date |
+-------------+----------------+---------------+------------------+-----------------+
| ... | | | | |
| A | 2015-07-30 | 2016-07-24 | | |
| B | 2015-07-10 | 2015-10-05 | 2016-01-09 | 2016-07-04 |
| C | 2015-05-13 | 2015-08-09 | | |
| ... | | | | |
+-------------+----------------+---------------+------------------+-----------------+
我能想到几个非常繁琐的方法来完成上述操作,但我想知道是否有快速有效的方法(也许使用 windows?我只使用 PySpark 一个月现在,肯定还是新手)。
如果您显示的那些空单元格实际上是 null
s(与空字符串相反),您可以使用 pyspark.sql.functions.first()
作为 groupBy
中的聚合函数。关键是将first()
的ignorenulls
参数设置为True
(默认为False
)。
import pyspark.sql.functions as f
cols = [c for c in df.columns if c != 'customer_id']
df.groupBy('customer_id').agg(*[f.first(c, True).alias(c) for c in cols]).show()
#+-----------+--------------+-------------+----------------+---------------+
#|customer_id|init_base_date|init_end_date|reinit_base_date|reinit_end_date|
#+-----------+--------------+-------------+----------------+---------------+
#| A| 2015-07-30| 2016-07-24| null| null|
#| B| 2015-07-10| 2015-10-05| 2016-01-09| 2016-07-04|
#| C| 2015-05-13| 2015-08-09| null| null|
#+-----------+--------------+-------------+----------------+---------------+
如果这些空白值实际上是空字符串,您可以先 然后按照上述方法操作。然后您可以(可选)用空格替换 null
值。
from functools import reduce # for python3
cols = [c for c in df.columns if c != 'customer_id']
df = reduce(lambda df, c: df.withColumn(c, f.when(f.col(c) != '', f.col(c))), cols, df)
df = df.groupBy('customer_id').agg(*[f.first(c, True).alias(c) for c in cols])
df.na.fill('').show() # fill nulls with blanks
#+-----------+--------------+-------------+----------------+---------------+
#|customer_id|init_base_date|init_end_date|reinit_base_date|reinit_end_date|
#+-----------+--------------+-------------+----------------+---------------+
#| A| 2015-07-30| 2016-07-24| | |
#| B| 2015-07-10| 2015-10-05| 2016-01-09| 2016-07-04|
#| C| 2015-05-13| 2015-08-09| | |
#+-----------+--------------+-------------+----------------+---------------+
所以我目前有一个如下所示的数据框:
+-------------+----------------+---------------+------------------+-----------------+
| customer_id | init_base_date | init_end_date | reinit_base_date | reinit_end_date |
+-------------+----------------+---------------+------------------+-----------------+
| ... | | | | |
| A | 2015-07-30 | | | |
| A | | 2016-07-24 | | |
| B | 2015-07-10 | | | |
| B | | 2015-10-05 | | |
| B | | | 2016-01-09 | |
| B | | | | 2016-07-04 |
| C | 2015-05-13 | | | |
| C | | 2015-08-09 | | |
| ... | | | | |
+-------------+----------------+---------------+------------------+-----------------+
而且我确实需要将其转换为以下形式:
+-------------+----------------+---------------+------------------+-----------------+
| customer_id | init_base_date | init_end_date | reinit_base_date | reinit_end_date |
+-------------+----------------+---------------+------------------+-----------------+
| ... | | | | |
| A | 2015-07-30 | 2016-07-24 | | |
| B | 2015-07-10 | 2015-10-05 | 2016-01-09 | 2016-07-04 |
| C | 2015-05-13 | 2015-08-09 | | |
| ... | | | | |
+-------------+----------------+---------------+------------------+-----------------+
我能想到几个非常繁琐的方法来完成上述操作,但我想知道是否有快速有效的方法(也许使用 windows?我只使用 PySpark 一个月现在,肯定还是新手)。
如果您显示的那些空单元格实际上是 null
s(与空字符串相反),您可以使用 pyspark.sql.functions.first()
作为 groupBy
中的聚合函数。关键是将first()
的ignorenulls
参数设置为True
(默认为False
)。
import pyspark.sql.functions as f
cols = [c for c in df.columns if c != 'customer_id']
df.groupBy('customer_id').agg(*[f.first(c, True).alias(c) for c in cols]).show()
#+-----------+--------------+-------------+----------------+---------------+
#|customer_id|init_base_date|init_end_date|reinit_base_date|reinit_end_date|
#+-----------+--------------+-------------+----------------+---------------+
#| A| 2015-07-30| 2016-07-24| null| null|
#| B| 2015-07-10| 2015-10-05| 2016-01-09| 2016-07-04|
#| C| 2015-05-13| 2015-08-09| null| null|
#+-----------+--------------+-------------+----------------+---------------+
如果这些空白值实际上是空字符串,您可以先 null
值。
from functools import reduce # for python3
cols = [c for c in df.columns if c != 'customer_id']
df = reduce(lambda df, c: df.withColumn(c, f.when(f.col(c) != '', f.col(c))), cols, df)
df = df.groupBy('customer_id').agg(*[f.first(c, True).alias(c) for c in cols])
df.na.fill('').show() # fill nulls with blanks
#+-----------+--------------+-------------+----------------+---------------+
#|customer_id|init_base_date|init_end_date|reinit_base_date|reinit_end_date|
#+-----------+--------------+-------------+----------------+---------------+
#| A| 2015-07-30| 2016-07-24| | |
#| B| 2015-07-10| 2015-10-05| 2016-01-09| 2016-07-04|
#| C| 2015-05-13| 2015-08-09| | |
#+-----------+--------------+-------------+----------------+---------------+