pyspark 只删除连续的重复行

pyspark remove just consecutive duplicated rows

经过一些迭代并通过 batch_date 连接后,我得到了一个数据框。让我们关注 r_id0==0r_id0==1:

+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date|      from|     until|                desc|                hash|
+-----+----------+----------+----------+--------------------+--------------------+
|    0|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 1|b10192333e9c61a40...|
|    0|2020-04-30|2020-04-30|9999-12-31|  Esto es un campo 1|b10192333e9c61a40...|
|    1|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    1|2020-02-28|2020-02-28|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    1|2020-03-31|2020-03-31|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    2|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 2|b31c138a4b6a96169...|
|    2|2020-02-28|2020-02-28|2020-03-30|  Esto es un campo 4|fa1cfe1eaf9b14f88...|
|    2|2020-03-31|2020-03-31|2020-04-29|  Esto es un campo 3|e4ffd0cd4a6cf1193...|
|    2|2020-04-30|2020-04-30|9999-12-31| Esto es un campo 23|0fdb24b8fcf8603ee...|
|    3|2020-02-28|2020-02-28|2020-03-30|  Esto es un campo 3|a3a6870ca9b42ad06...|
|    3|2020-03-31|2020-03-31|2020-04-29|  Esto es un campo 2|18b24f88271e99618...|
|    4|2020-03-31|2020-03-31|2020-04-29|  Esto es un campo 5|2fe50db0156cfc909...|
|    6|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 6|7c6d329b73d9de59f...|
|    6|2020-04-30|2020-04-30|9999-12-31|Esto es un campo 77|d70c9340f83167e95...|
+-----+----------+----------+----------+--------------------+--------------------+

我确实需要删除具有相同 hash 列的行,但只需要删除 batch_date 中连续月份的行。在那种情况下,我会获取从第一次出现重复项到最后一个 直到 列的值的所有内容。非连续重复项必须保持不变。

示例

+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date|      from|     until|                desc|                hash|
+-----+----------+----------+----------+--------------------+--------------------+
|    0|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 42|b10192333e9c61a40...|
|    0|2020-04-30|2020-04-30|9999-12-31|  Esto es un campo 42|b10192333e9c61a40...|
|    1|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 1|b10192333e9c61a40...|
|    2|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 2|b31c138a4b6a96169...|
etc etc... 

我为 r_id0==1 执行了我想要的功能,但它也会修改 r_id0==0 案例:

def removing_duplicates2(final_df):
    '''
    Removing duplicates. It takes all from first time they appear and last "until" column.
    '''
    # we create this list in order to avor_id iteration over the column "hash", which will be our groupby column
    iterating_columns= final_df.columns
    iterating_columns.remove("hash")
    exprs =  [F.first(x).alias(x) if x!="until" else F.last(x).alias(x)  for x in iterating_columns] 
    return  (
         final_df.groupBy("hash").agg(*exprs)
            .dropDuplicates(["hash"]).select(columns)
         )

这是结果,破坏了r_id0==0:

+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date|      from|     until|                desc|                hash|
+-----+----------+----------+----------+--------------------+--------------------+
|    0|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 42|b10192333e9c61a40...|
|    1|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    2|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 2|b31c138a4b6a96169...|
etc etc

所以,我很疑惑。我必须用 Pyspark 来做,这个例子将用于一个巨大的,该死的巨大 table,由 batch_date 分区。 我坚信我创建的每个循环都是朝着服务器爆炸和消防员责备我迈出的一步(我已经在使用一个循环(用于迭代 batch_date))。

对于冗长的描述,我们深表歉意, 任何意见或建议都非常受欢迎。

谢谢!

我们的想法是在第一步中使用 Window 识别只有连续月份的组。然后仅对第一步中找到的行进行分组,同时保持所有其他行不变。

创建一些测试数据:

from datetime import date
data = [[0, date.fromisoformat("2020-01-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("2020-02-27"), "b10192333e9c61a40"],
    [0, date.fromisoformat("2020-04-30"), date.fromisoformat("2020-04-30"), date.fromisoformat("9999-12-31"), "b10192333e9c61a40"],
    [0, date.fromisoformat("2020-05-01"), date.fromisoformat("2020-04-30"), date.fromisoformat("9999-12-31"), "b10192333e9c61a40"],
    [1, date.fromisoformat("2019-12-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [1, date.fromisoformat("2020-01-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [1, date.fromisoformat("2020-02-28"), date.fromisoformat("2020-02-28"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [1, date.fromisoformat("2020-03-31"), date.fromisoformat("2020-03-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [2, date.fromisoformat("2020-03-31"), date.fromisoformat("2020-03-31"), date.fromisoformat("9999-12-31"), "abcd"]]

df=spark.createDataFrame(data, schema=["r_id0","batch_date","from","until","hash"])

识别仅包含连续月份的 r_id0 的那些值:

from pyspark.sql import functions as F

w = Window.partitionBy("r_id0").orderBy("batch_date")
df2=df.withColumn("prev_bd_month", F.month(F.lag("batch_date").over(w)))\
    .withColumn("prev_bd_year", F.year(F.lag("batch_date").over(w))) \
    .withColumn("adj", F.when((F.year("batch_date").eqNullSafe(F.col("prev_bd_year")) 
                              & (F.month("batch_date") - F.col("prev_bd_month") == 1)) 
                              | ((F.year("batch_date") - F.col("prev_bd_year") == 1) 
                              & (F.month("batch_date") == 1) & (F.col("prev_bd_month") == 12) )
                              | F.col("prev_bd_year").isNull() ,1).otherwise(None)) \
    .groupBy("r_id0") \
    .agg(F.count("*").alias("count_all"), F.sum("adj").alias("count_adj")) \
    .withColumn("all_adj", F.col("count_all") == F.col("count_adj")) \
    .drop("count_all", "count_adj") \
    .join(df, "r_id0") \
    .cache()

中间结果:

+-----+-------+----------+----------+----------+-----------------+
|r_id0|all_adj|batch_date|      from|     until|             hash|
+-----+-------+----------+----------+----------+-----------------+
|    0|  false|2020-01-31|2020-01-31|2020-02-27|b10192333e9c61a40|
|    0|  false|2020-04-30|2020-04-30|9999-12-31|b10192333e9c61a40|
|    0|  false|2020-05-01|2020-04-30|9999-12-31|b10192333e9c61a40|
|    1|   true|2019-12-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
|    1|   true|2020-01-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
|    1|   true|2020-02-28|2020-02-28|9999-12-31| 63b3e8201cd417bb|
|    1|   true|2020-03-31|2020-03-31|9999-12-31| 63b3e8201cd417bb|
|    2|   true|2020-03-31|2020-03-31|9999-12-31|             abcd|
+-----+-------+----------+----------+----------+-----------------+

all_adj == true 的行分组并保留所有其他行:

df3=df2.filter("all_adj == true") \
    .groupBy("r_id0") \
    .agg(F.min("batch_date").alias("batch_date"), 
         F.expr("min_by(from, batch_date)").alias("from"),
         F.max("until").alias("until"),
         F.expr("min_by(hash, batch_date)").alias("hash")) \
    .union(df2.filter("all_adj == false").drop("all_adj"))

结果:

+-----+----------+----------+----------+-----------------+
|r_id0|batch_date|      from|     until|             hash|
+-----+----------+----------+----------+-----------------+
|    1|2019-12-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
|    2|2020-03-31|2020-03-31|9999-12-31|             abcd|
|    0|2020-01-31|2020-01-31|2020-02-27|b10192333e9c61a40|
|    0|2020-04-30|2020-04-30|9999-12-31|b10192333e9c61a40|
|    0|2020-05-01|2020-04-30|9999-12-31|b10192333e9c61a40|
+-----+----------+----------+----------+-----------------+