pyspark 操作没有扩展
pyspark operations not scaling up
我在笔记本中有一段代码可以正常工作,但在无休止的计算和 java.lang.OutOfMemoryError 的更大数据上失败了:Java 堆 space.
过程如下:
模拟 pyspark 数据
我从一个包含 3 列的数据框开始,即(用户、时间和项目),如下面的代码所示:
from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import pandas as pd
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
df_schema = StructType([ StructField("User", StringType(), True)\
,StructField("Time", IntegerType(), True)\
,StructField("Item", StringType(), True)])
pddf = pd.DataFrame([["u1",1,"A"],
["u1",1,"A"],
["u1",2,"A"],
["u1",3,"B"],
["u1",3,"C"],
["u1",4,"B"],
["u2",1,"D"],
["u2",2,"D"],
["u2",2,"A"],
["u2",2,"F"],
["u2",3,"D"],
["u2",3,"A"],],columns=["User", "Time", "Item"])
df = spark.createDataFrame(pddf,schema=df_schema)
df.show()
这给出了
+----+----+----+
|User|Time|Item|
+----+----+----+
| u1| 1| A|
| u1| 1| A|
| u1| 2| A|
| u1| 3| B|
| u1| 3| C|
| u1| 4| B|
| u2| 1| D|
| u2| 2| D|
| u2| 2| A|
| u2| 2| F|
| u2| 3| D|
| u2| 3| A|
+----+----+----+
中间步骤
然后我为每个用户计算前 n 个最常见的项目,并创建一个包含新列 uc(uc 表示不常见)的数据框,如果项目是 [=46,则设置为 0 =]在 topn 列表中 否则为 1。
import pyspark.sql.functions as F
from pyspark.sql import Window
ArrayOfTupleType = ArrayType(StructType([
StructField("itemId", StringType(), False),
StructField("count", IntegerType(), False)
]))
@F.udf(returnType=ArrayOfTupleType)
def most_common(x, topn=2):
from collections import Counter
c = Counter(x)
mc = c.most_common(topn)
return mc
topn=2
w0 = Window.partitionBy("User")
dfd = (df.withColumn("Item_freq", most_common(F.collect_list("Item").over(w0), F.lit(topn)))
.select("User", "Time" , "Item" , "Item_freq")
.withColumn("mcs", F.col("Item_freq.itemId"))
.withColumn("uc", F.when(F.expr("array_contains(mcs, Item)"), 0).otherwise(1)).cache())
dfd.select("User", "Time", "Item" , "mcs" , "uc").show()
下面给出中间数据框
+----+----+----+------+---+
|User|Time|Item|mcs |uc |
+----+----+----+------+---+
|u1 |1 |A |[A, B]|0 |
|u1 |1 |A |[A, B]|0 |
|u1 |2 |A |[A, B]|0 |
|u1 |3 |B |[A, B]|0 |
|u1 |3 |C |[A, B]|1 |
|u1 |4 |B |[A, B]|0 |
|u2 |1 |D |[D, A]|0 |
|u2 |2 |D |[D, A]|0 |
|u2 |2 |A |[D, A]|0 |
|u2 |2 |F |[D, A]|1 |
|u2 |3 |D |[D, A]|0 |
|u2 |3 |A |[D, A]|0 |
+----+----+----+------+---+
聚合步骤
然后我最后按用户和时间分组,这是在真实数据上失败的操作:
uncommon = dfd.groupBy("User", "Time").agg(F.sum(F.col("uc")).alias("UncommonItem"))
uncommon.orderBy("User", "Time", ascending=True).show()
这给出了虚拟数据的预期结果
+----+----+------------+
|User|Time|UncommonItem|
+----+----+------------+
|u1 |1 |0 |
|u1 |2 |0 |
|u1 |3 |1 |
|u1 |4 |0 |
|u2 |1 |0 |
|u2 |2 |1 |
|u2 |3 |0 |
+----+----+------------+
但它失败了 java.lang.OutOfMemoryError: Java heap space on real data.
将 spark.driver.memory 从 6G 增加到 60G 只会让崩溃在更长时间后出现,直到它填满 60G。我的真实数据有 1907505 个输入样本
我对pyspark不是很熟悉,不知道问题出在哪里。许多其他 groupby/agg 其他操作很快,并且不会在相同类型的数据上失败。因此,我怀疑问题出在我在上面的中间步骤中制作数据框 dfd 的方式。
关于如何优化代码有什么想法吗?
如果你可以改变方法,你可以试试下面的方法:
import pyspark.sql.functions as F
topn=2
w = Window.partitionBy('User','Item')
df1 = df.withColumn("Counts",F.count('Item').over(w))
w1 = Window.partitionBy(df1["User"]).orderBy(df1['Counts'].desc())
(df1.withColumn("dummy",F.when(F.dense_rank().over(w1)<=topn,0).otherwise(1))
.groupBy('User','Time').agg(F.max("dummy").alias('UncommonItem'))).show()
+----+----+------------+
|User|Time|UncommonItem|
+----+----+------------+
| u1| 1| 0|
| u1| 2| 0|
| u1| 3| 1|
| u1| 4| 0|
| u2| 1| 0|
| u2| 2| 1|
| u2| 3| 0|
+----+----+------------+
答案中遵循的步骤:
- 获取 window 用户和项目的计数
- 获取 dense_rank 用户和第 1 步中 return 的计数
- 凡排名在 2 以内的 (topn) return 1 else 0 并将其命名为 dummy
- 对用户和时间进行分组并获取虚拟对象的最大值
我在笔记本中有一段代码可以正常工作,但在无休止的计算和 java.lang.OutOfMemoryError 的更大数据上失败了:Java 堆 space.
过程如下:
模拟 pyspark 数据
我从一个包含 3 列的数据框开始,即(用户、时间和项目),如下面的代码所示:
from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import pandas as pd
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
df_schema = StructType([ StructField("User", StringType(), True)\
,StructField("Time", IntegerType(), True)\
,StructField("Item", StringType(), True)])
pddf = pd.DataFrame([["u1",1,"A"],
["u1",1,"A"],
["u1",2,"A"],
["u1",3,"B"],
["u1",3,"C"],
["u1",4,"B"],
["u2",1,"D"],
["u2",2,"D"],
["u2",2,"A"],
["u2",2,"F"],
["u2",3,"D"],
["u2",3,"A"],],columns=["User", "Time", "Item"])
df = spark.createDataFrame(pddf,schema=df_schema)
df.show()
这给出了
+----+----+----+
|User|Time|Item|
+----+----+----+
| u1| 1| A|
| u1| 1| A|
| u1| 2| A|
| u1| 3| B|
| u1| 3| C|
| u1| 4| B|
| u2| 1| D|
| u2| 2| D|
| u2| 2| A|
| u2| 2| F|
| u2| 3| D|
| u2| 3| A|
+----+----+----+
中间步骤
然后我为每个用户计算前 n 个最常见的项目,并创建一个包含新列 uc(uc 表示不常见)的数据框,如果项目是 [=46,则设置为 0 =]在 topn 列表中 否则为 1。
import pyspark.sql.functions as F
from pyspark.sql import Window
ArrayOfTupleType = ArrayType(StructType([
StructField("itemId", StringType(), False),
StructField("count", IntegerType(), False)
]))
@F.udf(returnType=ArrayOfTupleType)
def most_common(x, topn=2):
from collections import Counter
c = Counter(x)
mc = c.most_common(topn)
return mc
topn=2
w0 = Window.partitionBy("User")
dfd = (df.withColumn("Item_freq", most_common(F.collect_list("Item").over(w0), F.lit(topn)))
.select("User", "Time" , "Item" , "Item_freq")
.withColumn("mcs", F.col("Item_freq.itemId"))
.withColumn("uc", F.when(F.expr("array_contains(mcs, Item)"), 0).otherwise(1)).cache())
dfd.select("User", "Time", "Item" , "mcs" , "uc").show()
下面给出中间数据框
+----+----+----+------+---+
|User|Time|Item|mcs |uc |
+----+----+----+------+---+
|u1 |1 |A |[A, B]|0 |
|u1 |1 |A |[A, B]|0 |
|u1 |2 |A |[A, B]|0 |
|u1 |3 |B |[A, B]|0 |
|u1 |3 |C |[A, B]|1 |
|u1 |4 |B |[A, B]|0 |
|u2 |1 |D |[D, A]|0 |
|u2 |2 |D |[D, A]|0 |
|u2 |2 |A |[D, A]|0 |
|u2 |2 |F |[D, A]|1 |
|u2 |3 |D |[D, A]|0 |
|u2 |3 |A |[D, A]|0 |
+----+----+----+------+---+
聚合步骤
然后我最后按用户和时间分组,这是在真实数据上失败的操作:
uncommon = dfd.groupBy("User", "Time").agg(F.sum(F.col("uc")).alias("UncommonItem"))
uncommon.orderBy("User", "Time", ascending=True).show()
这给出了虚拟数据的预期结果
+----+----+------------+
|User|Time|UncommonItem|
+----+----+------------+
|u1 |1 |0 |
|u1 |2 |0 |
|u1 |3 |1 |
|u1 |4 |0 |
|u2 |1 |0 |
|u2 |2 |1 |
|u2 |3 |0 |
+----+----+------------+
但它失败了 java.lang.OutOfMemoryError: Java heap space on real data.
将 spark.driver.memory 从 6G 增加到 60G 只会让崩溃在更长时间后出现,直到它填满 60G。我的真实数据有 1907505 个输入样本
我对pyspark不是很熟悉,不知道问题出在哪里。许多其他 groupby/agg 其他操作很快,并且不会在相同类型的数据上失败。因此,我怀疑问题出在我在上面的中间步骤中制作数据框 dfd 的方式。
关于如何优化代码有什么想法吗?
如果你可以改变方法,你可以试试下面的方法:
import pyspark.sql.functions as F
topn=2
w = Window.partitionBy('User','Item')
df1 = df.withColumn("Counts",F.count('Item').over(w))
w1 = Window.partitionBy(df1["User"]).orderBy(df1['Counts'].desc())
(df1.withColumn("dummy",F.when(F.dense_rank().over(w1)<=topn,0).otherwise(1))
.groupBy('User','Time').agg(F.max("dummy").alias('UncommonItem'))).show()
+----+----+------------+
|User|Time|UncommonItem|
+----+----+------------+
| u1| 1| 0|
| u1| 2| 0|
| u1| 3| 1|
| u1| 4| 0|
| u2| 1| 0|
| u2| 2| 1|
| u2| 3| 0|
+----+----+------------+
答案中遵循的步骤:
- 获取 window 用户和项目的计数
- 获取 dense_rank 用户和第 1 步中 return 的计数
- 凡排名在 2 以内的 (topn) return 1 else 0 并将其命名为 dummy
- 对用户和时间进行分组并获取虚拟对象的最大值