将 Pandas 代码转换为 Pyspark 的问题
Issue with transforming Pandas code to Pyspark
我需要一些帮助来将我的 Pandas 代码转换为 PySpark。
我对 PySpark 很陌生,面临着这种转变的问题。
我有多个商店内所售商品的每周销售数据。
初始数据如下所示:
store_id
item_id
week
sales
store1
item1
2021-01
3
store1
item2
2021-01
2
store2
item1
2021-01
10
store2
item3
2021-01
1
store1
item1
2021-02
5
store1
item2
2021-02
1
store2
item1
2021-02
11
store1
item3
2021-03
6
store1
item1
2021-04
7
store2
item3
2021-04
2
第 1 步:我想将其转换为每个(商店、项目)组合的单个条目。我想为周列中的每个唯一条目创建单独的销售额列。
因此,我在 Pandas 中使用它创建了以下 Dataframe:
df2 = df.groupby(['store_id', 'item_id', 'week'])['sales'].sum().unstack(fill_value=0)
df2 = df2.add_prefix('sales_week_')
store_id
item_id
sales_week_2021-01
sales_week_2021-02
sales_week_2021-03
sales_week_2021-04
store1
item1
3
5
0
7
store1
item2
2
1
0
0
store1
item3
0
0
6
0
store2
item1
10
11
0
0
store2
item3
1
0
0
2
第 2 步:现在如果任何 item_id 在开始几周内的销售额为 0,这可能意味着该商品稍后才上架,因此我们想用 nan 替换这些值。我在 pandas 中使用以下内容:
df2 = df2.mask(df2.cumsum(1).eq(0), np.nan)
现在,store1 和 item3 包含 nan
个 2021 年前两周的条目。
store_id
item_id
sales_week_2021-01
sales_week_2021-02
sales_week_2021-03
sales_week_2021-04
store1
item1
3
5
0
7
store1
item2
2
1
0
0
store1
item3
nan
nan
6
0
store2
item1
10
11
0
0
store2
item3
1
0
0
2
我正在试用 PySpark 等价物
第 1 步:
columns = ["store_id","item_id", "week", "sales"]
data = [("store1","item1","2021-01",3),
("store1","item2","2021-01",2),
("store2","item1","2021-01",0),
("store2","item3","2021-01",1),
("store1","item1","2021-02",5),
("store1","item2","2021-02",1),
("store2","item1","2021-02",1),
("store1","item3","2021-03",6),
("store1","item1","2021-04",7),
("store2","item3","2021-04",2)]
df = spark.sparkContext.parallelize(data).toDF(columns)
df2 = df.groupBy('store_id','item_id').pivot('week').sum('sales')
# It fills NA at all empty places by default, so :
df2=df2.na.fill(0)
#Renaming column headers:
weeks = df2.schema.names[2:]
new_weeks = [('sales_week_' + week) for week in weeks]
for i in range(0, len(weeks)):
df2 = df2.withColumnRenamed(weeks[i], new_weeks[i])
第 2 步:
我被困在这一点上,不确定如何使用 PySpark 实现这种转换。我早些时候收到了关于如何以 pandas 方式使用 mask & cumsum 的信息:Pandas fill NaN in columns based on some conditions
欢迎任何意见。
我终于能够像这样完成它 - 遵循的逻辑类似于 pandas 代码。创建一个具有行累积和的新数据框,在 cumulative-sum = 0.
的任何地方用 NAN 替换值
第 1 步:使用 RDD-map 运算获取数周的累计和行。
output_cols = ["store_id2", "item_id2"] + [('cumsum_week_' + week) for week in weeks]
def func1(x):
result = []
cur = x[2]
result.append(cur)
for i in range(1, len(weeks)):
cur += x[i+2]
result.append(cur)
return (x[0],x[1],) + tuple(result)
rdd=df2.rdd.map(lambda x: func1(x))
df3=rdd.toDF(output_cols)
第 2 步:使用 spark sql 操作首先连接两个数据集,并使用 when-otherwise 子句根据值 0 应用 if-else。
from pyspark.sql import functions as F
from pyspark.sql.functions import col
cols = list(set(df2.columns))
joindf = df2.join(df3, (df2.store_id == df3.store_id2) & (df2.item_id == df3.item_id2), "full")
for week in weeks:
joindf = joindf.withColumn("sales_week_"+week, F.when((col("cumsum_week_"+week) > 0), col("sales_week_"+week)).otherwise(np.nan))
joindf = joindf.select(cols)
joindf.show()
这可能不是最佳方法,但它适用于大于 10 MM 记录的庞大数据集,所以我很好!
我需要一些帮助来将我的 Pandas 代码转换为 PySpark。
我对 PySpark 很陌生,面临着这种转变的问题。
我有多个商店内所售商品的每周销售数据。
初始数据如下所示:
store_id | item_id | week | sales |
---|---|---|---|
store1 | item1 | 2021-01 | 3 |
store1 | item2 | 2021-01 | 2 |
store2 | item1 | 2021-01 | 10 |
store2 | item3 | 2021-01 | 1 |
store1 | item1 | 2021-02 | 5 |
store1 | item2 | 2021-02 | 1 |
store2 | item1 | 2021-02 | 11 |
store1 | item3 | 2021-03 | 6 |
store1 | item1 | 2021-04 | 7 |
store2 | item3 | 2021-04 | 2 |
第 1 步:我想将其转换为每个(商店、项目)组合的单个条目。我想为周列中的每个唯一条目创建单独的销售额列。
因此,我在 Pandas 中使用它创建了以下 Dataframe:
df2 = df.groupby(['store_id', 'item_id', 'week'])['sales'].sum().unstack(fill_value=0)
df2 = df2.add_prefix('sales_week_')
store_id | item_id | sales_week_2021-01 | sales_week_2021-02 | sales_week_2021-03 | sales_week_2021-04 |
---|---|---|---|---|---|
store1 | item1 | 3 | 5 | 0 | 7 |
store1 | item2 | 2 | 1 | 0 | 0 |
store1 | item3 | 0 | 0 | 6 | 0 |
store2 | item1 | 10 | 11 | 0 | 0 |
store2 | item3 | 1 | 0 | 0 | 2 |
第 2 步:现在如果任何 item_id 在开始几周内的销售额为 0,这可能意味着该商品稍后才上架,因此我们想用 nan 替换这些值。我在 pandas 中使用以下内容:
df2 = df2.mask(df2.cumsum(1).eq(0), np.nan)
现在,store1 和 item3 包含 nan
个 2021 年前两周的条目。
store_id | item_id | sales_week_2021-01 | sales_week_2021-02 | sales_week_2021-03 | sales_week_2021-04 |
---|---|---|---|---|---|
store1 | item1 | 3 | 5 | 0 | 7 |
store1 | item2 | 2 | 1 | 0 | 0 |
store1 | item3 | nan | nan | 6 | 0 |
store2 | item1 | 10 | 11 | 0 | 0 |
store2 | item3 | 1 | 0 | 0 | 2 |
我正在试用 PySpark 等价物
第 1 步:
columns = ["store_id","item_id", "week", "sales"]
data = [("store1","item1","2021-01",3),
("store1","item2","2021-01",2),
("store2","item1","2021-01",0),
("store2","item3","2021-01",1),
("store1","item1","2021-02",5),
("store1","item2","2021-02",1),
("store2","item1","2021-02",1),
("store1","item3","2021-03",6),
("store1","item1","2021-04",7),
("store2","item3","2021-04",2)]
df = spark.sparkContext.parallelize(data).toDF(columns)
df2 = df.groupBy('store_id','item_id').pivot('week').sum('sales')
# It fills NA at all empty places by default, so :
df2=df2.na.fill(0)
#Renaming column headers:
weeks = df2.schema.names[2:]
new_weeks = [('sales_week_' + week) for week in weeks]
for i in range(0, len(weeks)):
df2 = df2.withColumnRenamed(weeks[i], new_weeks[i])
第 2 步: 我被困在这一点上,不确定如何使用 PySpark 实现这种转换。我早些时候收到了关于如何以 pandas 方式使用 mask & cumsum 的信息:Pandas fill NaN in columns based on some conditions
欢迎任何意见。
我终于能够像这样完成它 - 遵循的逻辑类似于 pandas 代码。创建一个具有行累积和的新数据框,在 cumulative-sum = 0.
的任何地方用 NAN 替换值第 1 步:使用 RDD-map 运算获取数周的累计和行。
output_cols = ["store_id2", "item_id2"] + [('cumsum_week_' + week) for week in weeks]
def func1(x):
result = []
cur = x[2]
result.append(cur)
for i in range(1, len(weeks)):
cur += x[i+2]
result.append(cur)
return (x[0],x[1],) + tuple(result)
rdd=df2.rdd.map(lambda x: func1(x))
df3=rdd.toDF(output_cols)
第 2 步:使用 spark sql 操作首先连接两个数据集,并使用 when-otherwise 子句根据值 0 应用 if-else。
from pyspark.sql import functions as F
from pyspark.sql.functions import col
cols = list(set(df2.columns))
joindf = df2.join(df3, (df2.store_id == df3.store_id2) & (df2.item_id == df3.item_id2), "full")
for week in weeks:
joindf = joindf.withColumn("sales_week_"+week, F.when((col("cumsum_week_"+week) > 0), col("sales_week_"+week)).otherwise(np.nan))
joindf = joindf.select(cols)
joindf.show()
这可能不是最佳方法,但它适用于大于 10 MM 记录的庞大数据集,所以我很好!