PySpark 中的等效 Pandas 掩码和移位
Equivalent Pandas masking and shift in PySpark
我一直在 Python 上构建我的应用程序,但出于某种原因我需要将它放在分布式环境中,所以我正在尝试构建和应用
使用 Spark 但无法像 Pandas 中的 shift
一样快地生成代码。
mask = (df['name_x'].shift(0) == df['name_y'].shift(0)) & \
(df['age_x'].shift(0) == df['age_y'].shift(0))
df = df[~mask1]
在哪里
mask.tolist()
给予
[True, False, True, False]
最终结果 df
将仅包含两行(第 2 行和第 4 行)。
基本上尝试删除 [name_x、age_x]col 重复的行(如果出现在 [name_y、age_y]col.
上)
以上代码在 Pandas 数据帧上。最接近的 PySpark 代码是什么,它既高效又不导入 Pandas?
我在 Spark 上检查过 Window
但不确定。
shift
在您的代码中不起作用。这个
import pandas as pd
df = pd.DataFrame({
"name_x" : ["ABC", "CDF", "DEW", "ABC"],
"age_x": [20, 20, 22, 21],
"name_y" : ["ABC", "CDF", "DEW", "ABC"],
"age_y" : [20, 21, 22, 19],
})
mask1 = (df['name_x'].shift(0) == df['name_y'].shift(0)) & \
(df['age_x'].shift(0) == df['age_y'].shift(0))
df[~mask1]
# name_x age_x name_y age_y
# 1 CDF 20 CDF 21
# 3 ABC 21 ABC 19
正好等同于
mask2 = (df['name_x'] == df['name_y']) & (df['age_x'] == df['age_y'])
df[~mask2]
# name_x age_x name_y age_y
# 1 CDF 20 CDF 21
# 3 ABC 21 ABC 19
因此你只需要过滤器:
sdf = spark.createDataFrame(df)
smask = ~((sdf["name_x"] == sdf["name_y"]) & (sdf["age_x"] == sdf["age_y"]))
sdf.filter(smask).show()
# +------+-----+------+-----+
# |name_x|age_x|name_y|age_y|
# +------+-----+------+-----+
# | CDF| 20| CDF| 21|
# | ABC| 21| ABC| 19|
# +------+-----+------+-----+
根据德摩根定律,可以简化为
(sdf["name_x"] != sdf["name_y"]) | (sdf["age_x"] != sdf["age_y"])
一般来说,shift
可以用表示。
我一直在 Python 上构建我的应用程序,但出于某种原因我需要将它放在分布式环境中,所以我正在尝试构建和应用
使用 Spark 但无法像 Pandas 中的 shift
一样快地生成代码。
mask = (df['name_x'].shift(0) == df['name_y'].shift(0)) & \
(df['age_x'].shift(0) == df['age_y'].shift(0))
df = df[~mask1]
在哪里
mask.tolist()
给予
[True, False, True, False]
最终结果 df
将仅包含两行(第 2 行和第 4 行)。
基本上尝试删除 [name_x、age_x]col 重复的行(如果出现在 [name_y、age_y]col.
以上代码在 Pandas 数据帧上。最接近的 PySpark 代码是什么,它既高效又不导入 Pandas?
我在 Spark 上检查过 Window
但不确定。
shift
在您的代码中不起作用。这个
import pandas as pd
df = pd.DataFrame({
"name_x" : ["ABC", "CDF", "DEW", "ABC"],
"age_x": [20, 20, 22, 21],
"name_y" : ["ABC", "CDF", "DEW", "ABC"],
"age_y" : [20, 21, 22, 19],
})
mask1 = (df['name_x'].shift(0) == df['name_y'].shift(0)) & \
(df['age_x'].shift(0) == df['age_y'].shift(0))
df[~mask1]
# name_x age_x name_y age_y
# 1 CDF 20 CDF 21
# 3 ABC 21 ABC 19
正好等同于
mask2 = (df['name_x'] == df['name_y']) & (df['age_x'] == df['age_y'])
df[~mask2]
# name_x age_x name_y age_y
# 1 CDF 20 CDF 21
# 3 ABC 21 ABC 19
因此你只需要过滤器:
sdf = spark.createDataFrame(df)
smask = ~((sdf["name_x"] == sdf["name_y"]) & (sdf["age_x"] == sdf["age_y"]))
sdf.filter(smask).show()
# +------+-----+------+-----+
# |name_x|age_x|name_y|age_y|
# +------+-----+------+-----+
# | CDF| 20| CDF| 21|
# | ABC| 21| ABC| 19|
# +------+-----+------+-----+
根据德摩根定律,可以简化为
(sdf["name_x"] != sdf["name_y"]) | (sdf["age_x"] != sdf["age_y"])
一般来说,shift
可以用