PySpark 中的等效 Pandas 掩码和移位

Equivalent Pandas masking and shift in PySpark

我一直在 Python 上构建我的应用程序,但出于某种原因我需要将它放在分布式环境中,所以我正在尝试构建和应用

使用 Spark 但无法像 Pandas 中的 shift 一样快地生成代码。

mask = (df['name_x'].shift(0) == df['name_y'].shift(0)) & \ 
(df['age_x'].shift(0) == df['age_y'].shift(0))
df = df[~mask1]

在哪里

mask.tolist()                                               

给予

[True, False, True, False]

最终结果 df 将仅包含两行(第 2 行和第 4 行)。 基本上尝试删除 [name_x、age_x]col 重复的行(如果出现在 [name_y、age_y]col.

上)

以上代码在 Pandas 数据帧上。最接近的 PySpark 代码是什么,它既高效又不导入 Pandas?

我在 Spark 上检查过 Window 但不确定。

shift 在您的代码中不起作用。这个

import pandas as pd 

df = pd.DataFrame({
    "name_x" : ["ABC", "CDF", "DEW", "ABC"],
    "age_x": [20, 20, 22, 21],
    "name_y" : ["ABC", "CDF", "DEW", "ABC"],
    "age_y" : [20, 21, 22, 19],
})

mask1 = (df['name_x'].shift(0) == df['name_y'].shift(0)) & \
  (df['age_x'].shift(0) == df['age_y'].shift(0))
df[~mask1]

#  name_x  age_x name_y  age_y
# 1    CDF     20    CDF     21
# 3    ABC     21    ABC     19

正好等同于

mask2 = (df['name_x'] == df['name_y']) & (df['age_x'] == df['age_y'])
df[~mask2]

#   name_x  age_x name_y  age_y
# 1    CDF     20    CDF     21
# 3    ABC     21    ABC     19

因此你只需要过滤器:

sdf = spark.createDataFrame(df)

smask = ~((sdf["name_x"] == sdf["name_y"]) & (sdf["age_x"] == sdf["age_y"]))
sdf.filter(smask).show()
# +------+-----+------+-----+
# |name_x|age_x|name_y|age_y|
# +------+-----+------+-----+
# |   CDF|   20|   CDF|   21|
# |   ABC|   21|   ABC|   19|
# +------+-----+------+-----+

根据德摩根定律,可以简化为

(sdf["name_x"] != sdf["name_y"]) | (sdf["age_x"] != sdf["age_y"])

一般来说,shift可以用表示。