创建 pyspark 逻辑来填充列

Create pyspark logic to populate columns

我有 2 个数据框,一个是今天的,一个是昨天的,我需要帮助来创建一个逻辑来比较它们并创建一个包含以下列的新数据框:

customer_id -> 用于比较的列

person_id -> 用于比较的列

type_person -> 如果type_person列与前一天相同,则保留,如果不更新为今天的新状态。

anterior_type -> 如果 type_person 列已更新,请将之前的状态放在这里。

update_date -> 如果有新的 type_person.

,则更新记录的日期

create_date -> 创建记录的日期。

今天的数据框:

customer_id person_id type_person insert_date
afabd2d2 4a5ae8a5-6682-467... Online 2022-03-03
afabd2d2 1be8d3e8-8075-438... Online 2022-03-03
afabd2d2 6912dadc-1692-4bd... Online 2022-03-03
afabd2d2 e48cba37-113c-4bd... Online 2022-03-03
afabd2d2 831cb669-b2ae-4e8... Online 2022-03-03
afabd2d2 69161fe5-62ac-400... Hybrid 2022-03-03
afabd2d2 b48b59a0-92eb-410... Hybrid 2022-03-03

昨天的数据框:

customer_id person_id type_person insert_date
afabd2d2 4a5ae8a5-6682-467... Online 2022-03-02
afabd2d2 1be8d3e8-8075-438... Online 2022-03-02
afabd2d2 6912dadc-1692-4bd... Online 2022-03-02
afabd2d2 e48cba37-113c-4bd... Online 2022-03-02
afabd2d2 831cb669-b2ae-4e8... Online 2022-03-02
afabd2d2 69161fe5-62ac-400... Online 2022-03-02
afabd2d2 b48b59a0-92eb-410... Online 2022-03-02

对于今天的数据框,我有 2 行更改为混合,因此它们必须具有 update_dateanterior_type,其他保持不变,并且其他空字段在这个数据框。

我调整示例数据以演示新记录方案。我添加了最后一行。

customer_id person_id type_person insert_date
afabd2d2 4a5ae8a5-6682-467... Online 2022-03-03
afabd2d2 1be8d3e8-8075-438... Online 2022-03-03
afabd2d2 6912dadc-1692-4bd... Online 2022-03-03
afabd2d2 e48cba37-113c-4bd... Online 2022-03-03
afabd2d2 831cb669-b2ae-4e8... Online 2022-03-03
afabd2d2 69161fe5-62ac-400... Hybrid 2022-03-03
afabd2d2 b48b59a0-92eb-410... Hybrid 2022-03-03
afabd2d2 xxxxxxxx-xxxx-xxx... Online 2022-03-03

要比较差异,可以先join 2个dataframes。

# dft: today's dataframe, dfy: yesterday's dataframe
# If you also want to track the deletion, change how from left to outer.
df = dft.join(dfy, on=['customer_id', 'person_id'], how='left')

然后,使用when进行比较。

df = (df.withColumn('anterior_type', (F.when(dfy.type_person.isNull(), 'New')
                                       .when(dfy.type_person != dft.type_person, dfy.type_person)))
        .withColumn('update_date', F.when(dfy.type_person.isNull() | (dfy.type_person != dft.type_person), dft.insert_date))

结果

|customer_id|  person_id|type_person|insert_date|type_person|insert_date|anterior_type|update_date|
|-----------+-----------+-----------+-----------+-----------+-----------|-------------+-----------|
|   afabd2d2|4a5ae8a5...|     Online| 2022-03-03|     Online| 2022-03-02|         null|       null|
|   afabd2d2|1be8d3e8...|     Online| 2022-03-03|     Online| 2022-03-02|         null|       null|
|   afabd2d2|6912dadc...|     Online| 2022-03-03|     Online| 2022-03-02|         null|       null|
|   afabd2d2|e48cba37...|     Online| 2022-03-03|     Online| 2022-03-02|         null|       null|
|   afabd2d2|831cb669...|     Online| 2022-03-03|     Online| 2022-03-02|         null|       null|
|   afabd2d2|69161fe5...|     Hybrid| 2022-03-03|     Online| 2022-03-02|       Online| 2022-03-03|
|   afabd2d2|b48b59a0...|     Hybrid| 2022-03-03|     Online| 2022-03-02|       Online| 2022-03-03|
|   afabd2d2|xxxxxxxx...|     Online| 2022-03-03|       null|       null|          New| 2022-03-03|

如果 anterior_typeupdate_date 需要 null 以外的值,您可以在 when 之后链接 otherwise 以添加值。