将 Pandas 代码转换为 Pyspark 的问题

Issue with transforming Pandas code to Pyspark

我需要一些帮助来将我的 Pandas 代码转换为 PySpark。
我对 PySpark 很陌生,面临着这种转变的问题。

我有多个商店内所售商品的每周销售数据。
初始数据如下所示:

store_id item_id week sales
store1 item1 2021-01 3
store1 item2 2021-01 2
store2 item1 2021-01 10
store2 item3 2021-01 1
store1 item1 2021-02 5
store1 item2 2021-02 1
store2 item1 2021-02 11
store1 item3 2021-03 6
store1 item1 2021-04 7
store2 item3 2021-04 2

第 1 步:我想将其转换为每个(商店、项目)组合的单个条目。我想为周列中的每个唯一条目创建单独的销售额列。

因此,我在 Pandas 中使用它创建了以下 Dataframe:

df2 = df.groupby(['store_id', 'item_id', 'week'])['sales'].sum().unstack(fill_value=0)
df2 = df2.add_prefix('sales_week_')
store_id item_id sales_week_2021-01 sales_week_2021-02 sales_week_2021-03 sales_week_2021-04
store1 item1 3 5 0 7
store1 item2 2 1 0 0
store1 item3 0 0 6 0
store2 item1 10 11 0 0
store2 item3 1 0 0 2

第 2 步:现在如果任何 item_id 在开始几周内的销售额为 0,这可能意味着该商品稍后才上架,因此我们想用 nan 替换这些值。我在 pandas 中使用以下内容:

df2 = df2.mask(df2.cumsum(1).eq(0), np.nan)

现在,store1 和 item3 包含 nan 个 2021 年前两周的条目。

store_id item_id sales_week_2021-01 sales_week_2021-02 sales_week_2021-03 sales_week_2021-04
store1 item1 3 5 0 7
store1 item2 2 1 0 0
store1 item3 nan nan 6 0
store2 item1 10 11 0 0
store2 item3 1 0 0 2

我正在试用 PySpark 等价物

第 1 步:

columns = ["store_id","item_id", "week", "sales"]
data = [("store1","item1","2021-01",3),
("store1","item2","2021-01",2),
("store2","item1","2021-01",0),
("store2","item3","2021-01",1),
("store1","item1","2021-02",5),
("store1","item2","2021-02",1),
("store2","item1","2021-02",1),
("store1","item3","2021-03",6),
("store1","item1","2021-04",7),
("store2","item3","2021-04",2)]

df = spark.sparkContext.parallelize(data).toDF(columns)

df2 = df.groupBy('store_id','item_id').pivot('week').sum('sales')

# It fills NA at all empty places by default, so :
df2=df2.na.fill(0)

#Renaming column headers:
weeks = df2.schema.names[2:]
new_weeks = [('sales_week_' + week) for week in weeks]
for i in range(0, len(weeks)):
    df2 = df2.withColumnRenamed(weeks[i], new_weeks[i])

第 2 步: 我被困在这一点上,不确定如何使用 PySpark 实现这种转换。我早些时候收到了关于如何以 pandas 方式使用 mask & cumsum 的信息:Pandas fill NaN in columns based on some conditions

欢迎任何意见。

我终于能够像这样完成它 - 遵循的逻辑类似于 pandas 代码。创建一个具有行累积和的新数据框,在 cumulative-sum = 0.

的任何地方用 NAN 替换值

第 1 步:使用 RDD-map 运算获取数周的累计和行。

output_cols = ["store_id2", "item_id2"] + [('cumsum_week_' + week) for week in weeks]

def func1(x):
    result = []
    cur = x[2]
    result.append(cur)
    
    for i in range(1, len(weeks)):
        cur += x[i+2]
        result.append(cur)
    return (x[0],x[1],) + tuple(result)

rdd=df2.rdd.map(lambda x: func1(x))  
df3=rdd.toDF(output_cols)

第 2 步:使用 spark sql 操作首先连接两个数据集,并使用 when-otherwise 子句根据值 0 应用 if-else。

from pyspark.sql import functions as F
from pyspark.sql.functions import col

cols = list(set(df2.columns))

joindf = df2.join(df3, (df2.store_id == df3.store_id2) & (df2.item_id == df3.item_id2), "full")

for week in weeks:
    joindf = joindf.withColumn("sales_week_"+week, F.when((col("cumsum_week_"+week) > 0), col("sales_week_"+week)).otherwise(np.nan))

joindf = joindf.select(cols)
joindf.show()

这可能不是最佳方法,但它适用于大于 10 MM 记录的庞大数据集,所以我很好!