删除或加速 PySpark 中的显式 for 循环
remove, or speed-up an explicit for loop in PySpark
看完问题你就会明白,我是 Spark 的新手。我正在尝试使用每个会话的操作列表创建一个新的 DataFrame 以最终调用 PySparks FP-Growth function
为了阐明我想要什么,我有:
+-----------+---------+
|sessionid |event_col|
+-----------+---------+
|0 | 1|
|1 | 2|
|1 | 3|
|2 | 1|
|0 | 3|
+-----------+---------+
并且想要:
+-----------+---------+
|sessionid | events|
+-----------+---------+
| 0| [1, 3]|
| 1| [2, 3]|
| 2| [1]|
+-----------+---------+
我用 Python 用 Pandas DataFrame 简单地制作了这个原型来获取事件列表。
sessions = []
for sess in df.sessionid.unique():
session = []
for action in df[df.sessionid == sess]["event_col"]:
session.append(action)
sessions.append(session)
我在 PySpark (2.0) 中重写了它,但实际上重新创建了 for 循环:
def sessionsbuilder(df):
df = df.select(['sessionid', 'event_col'])
sessions = []
for sess in df.select('sessionid').distinct().collect():
session = []
for action in df.where(df.sessionid == sess[0][0]).select('event_col').collect():
session.append(action)
sessions.append(session)
return sessions
正如预期的那样,这非常慢(超过 2 小时,而普通 python 和 pandas 需要 11 秒)。我检查了 关于 Spark 中的嵌套 for 循环。由于 for 循环中的列表启动,我很难为我的目的创建这种 lambda。可能有一种方法可以在没有显式 for 循环的情况下创建这样的 DataFrame 或更有效地执行此操作的方法(可能是 udf),因为我没有以这种方式利用 Spark 的强大功能。
如果您的数据框看起来像
+---------+---------+
|sessionid|event_col|
+---------+---------+
|0 |1 |
|1 |2 |
|1 |3 |
|2 |1 |
|0 |3 |
+---------+---------+
然后如 Lokesh 在上面评论中所述的 groupBy 和聚合应该足以获得输出
from pyspark.sql import functions as F
df.groupBy("sessionid").agg(F.collect_list(F.col("event_col")).alias("events")).show(truncate=False)
你应该得到想要的输出
+---------+------+
|sessionid|events|
+---------+------+
|0 |[1, 3]|
|1 |[2, 3]|
|2 |[1] |
+---------+------+
希望回答对你有帮助
看完问题你就会明白,我是 Spark 的新手。我正在尝试使用每个会话的操作列表创建一个新的 DataFrame 以最终调用 PySparks FP-Growth function
为了阐明我想要什么,我有:
+-----------+---------+
|sessionid |event_col|
+-----------+---------+
|0 | 1|
|1 | 2|
|1 | 3|
|2 | 1|
|0 | 3|
+-----------+---------+
并且想要:
+-----------+---------+
|sessionid | events|
+-----------+---------+
| 0| [1, 3]|
| 1| [2, 3]|
| 2| [1]|
+-----------+---------+
我用 Python 用 Pandas DataFrame 简单地制作了这个原型来获取事件列表。
sessions = []
for sess in df.sessionid.unique():
session = []
for action in df[df.sessionid == sess]["event_col"]:
session.append(action)
sessions.append(session)
我在 PySpark (2.0) 中重写了它,但实际上重新创建了 for 循环:
def sessionsbuilder(df):
df = df.select(['sessionid', 'event_col'])
sessions = []
for sess in df.select('sessionid').distinct().collect():
session = []
for action in df.where(df.sessionid == sess[0][0]).select('event_col').collect():
session.append(action)
sessions.append(session)
return sessions
正如预期的那样,这非常慢(超过 2 小时,而普通 python 和 pandas 需要 11 秒)。我检查了
如果您的数据框看起来像
+---------+---------+
|sessionid|event_col|
+---------+---------+
|0 |1 |
|1 |2 |
|1 |3 |
|2 |1 |
|0 |3 |
+---------+---------+
然后如 Lokesh 在上面评论中所述的 groupBy 和聚合应该足以获得输出
from pyspark.sql import functions as F
df.groupBy("sessionid").agg(F.collect_list(F.col("event_col")).alias("events")).show(truncate=False)
你应该得到想要的输出
+---------+------+
|sessionid|events|
+---------+------+
|0 |[1, 3]|
|1 |[2, 3]|
|2 |[1] |
+---------+------+
希望回答对你有帮助