与 Apache Spark (Pyspark) 的组合和质量集交叉点
Combinations And Mass Set Intersections With Apache Spark (Pyspark)
挑战:
数据是 [(u, p), (u, p), ...] 形状的 RDD,其中 u 和 p 都是字符串。我具有挑战性的期望输出是 [((p,p), u), ..],其中每个元素中的键是共享 u 的所有 p 的组合。
已经尝试过:
- 使用笛卡尔和滤波器模拟组合。
- 尝试对每个元素使用 Python 中的 itertools.combinations,然后使用 flatMap,就像这样:
[(tuple(sorted(e)), x[0]) for e in combinations(x[1].split(','),2)])
我已经尝试过的问题是执行程序节点似乎出现故障,可能是由于内存消耗。
有什么建议吗?
编辑
以下是问题的更多信息和背景:
我正在寻找每两个 "p" 交集的重叠 count/cardinality,其中 "p" 是集合 ID,"u" 是成员的一个集合。
输入是一个巨大的"p"和"u"之间的关系记录列表,例如:
[(u1,p1), (u2, p2), (u1, p2), (u2, p1), (u1, p3)]
期望的输出是:[((p1,p2), 2), ((p1, p3), 1), ((p2, p3), 1)]
(注意组合而不是排列)
关于输入数据的更多细节:
- 有 50k - 100k 个不同的 "p"s。
- 一个"p"可以有几亿个唯一的"u"。
- 可以有重复的输入数据行。换句话说,
(u,p)
的多个实例,但是在将 "u" 的列表视为交集步骤的给定 "p" 的集合时,这并没有改变问题。
至于硬件,我在 EMR(Yarn 上的 Spark 1.3.1)上使用了大约 41 m3.xlarges 并以这种方式关闭 shell:
./pyspark --master yarn-client --driver-memory 4G --executor-memory 3G --num-executors 160
yourRdd.groupByKey().map(lambda (a,b): (b,a))
group by key会将"u"的所有值聚合成
[(u1,(p1,p2,p3..,px)),(ux,(p1,...,px))]
地图将交换键和值,这应该会提供所需的输出。当没有足够的内存可用时,groupByKey 会溢出到磁盘,因此这也应该可以减轻您潜在的内存消耗问题。
//根据新信息进行编辑。
我无法提供确切的答案,但我可以帮助您了解适用于大部分代码的方法:
执行 groupByKey 后,数据将被分区,这样单个键的每个实例都在同一台机器上。现在我们知道单个 "u" 的所有 "p's" 都在同一台机器上,我们可以开始操作这些值了。
让我们以我的初始代码为起点,但稍微修改一下。
rdd_1 = yourRdd.groupByKey()
rdd_2 = rdd1.mapValues(mapFunction).flatMapValues()
mapFunction 是魔法发生的地方。该函数将采用 p 的所有值的元组输入,然后输出一个元组列表,其中每个元组是一对 p 值。
#Updated based on Zero's recommendation of generators.
def mapFunction(tple):
l = list(tple)
for i in range(len(tple))
for j in l[i+1:]
yield (l[i], j)
当你到达 i=len(tple) 时,你需要添加一些东西来处理,这样我们就不会出现越界异常。您仍然可能 运行 遇到内存问题,但下一部分应该可以帮助您解决这个问题。
我认为我们可以扰乱您的执行程序配置。鉴于您的机器设置,我认为我们可以创建更大的执行程序。此外,您的应用程序没有缓存我知道的数据,因此我们可以将所有执行程序内存设置为用于我修改 memoryFraction 设置的对象。我发现一些更大的执行器比许多更小的执行器更好(尽管 运行 YARN,一直很难获得大量资源,但那是完全不同的话题)。尝试使用 16 到 32gb 的执行器,2-5 个内核。
我会做一个类似这样的 spark sumbit:
spark-submit --master yarn-client --driver-memory 4g --executor-memory 16g --num-executors 30 --executor-cores 4 --conf spark.storage.memoryFraction=0
如果您在任何地方 caching/persisting 数据,请跳过 memoryFraction 设置
您可以尝试的一件事是将计算移动到 DataFrame
:
from pyspark.sql.functions import col
rdd = ...
df = rdd.toDF(["u", "p"])
xs = df.alias("xs")
ys = df.alias("ys")
result = (xs
.join(ys, (col("xs.u") == col("ys.u")) & (col("xs.p") < col("ys.p")))
.groupBy(col("xs.p"), col("ys.p"))
.count())
虽然我不是特别乐观。如果您想要一个确切的答案,那么必须以一种或另一种方式打乱数据。
挑战: 数据是 [(u, p), (u, p), ...] 形状的 RDD,其中 u 和 p 都是字符串。我具有挑战性的期望输出是 [((p,p), u), ..],其中每个元素中的键是共享 u 的所有 p 的组合。
已经尝试过:
- 使用笛卡尔和滤波器模拟组合。
- 尝试对每个元素使用 Python 中的 itertools.combinations,然后使用 flatMap,就像这样:
[(tuple(sorted(e)), x[0]) for e in combinations(x[1].split(','),2)])
我已经尝试过的问题是执行程序节点似乎出现故障,可能是由于内存消耗。
有什么建议吗?
编辑
以下是问题的更多信息和背景:
我正在寻找每两个 "p" 交集的重叠 count/cardinality,其中 "p" 是集合 ID,"u" 是成员的一个集合。
输入是一个巨大的"p"和"u"之间的关系记录列表,例如:
[(u1,p1), (u2, p2), (u1, p2), (u2, p1), (u1, p3)]
期望的输出是:[((p1,p2), 2), ((p1, p3), 1), ((p2, p3), 1)]
(注意组合而不是排列)
关于输入数据的更多细节:
- 有 50k - 100k 个不同的 "p"s。
- 一个"p"可以有几亿个唯一的"u"。
- 可以有重复的输入数据行。换句话说,
(u,p)
的多个实例,但是在将 "u" 的列表视为交集步骤的给定 "p" 的集合时,这并没有改变问题。
至于硬件,我在 EMR(Yarn 上的 Spark 1.3.1)上使用了大约 41 m3.xlarges 并以这种方式关闭 shell:
./pyspark --master yarn-client --driver-memory 4G --executor-memory 3G --num-executors 160
yourRdd.groupByKey().map(lambda (a,b): (b,a))
group by key会将"u"的所有值聚合成
[(u1,(p1,p2,p3..,px)),(ux,(p1,...,px))]
地图将交换键和值,这应该会提供所需的输出。当没有足够的内存可用时,groupByKey 会溢出到磁盘,因此这也应该可以减轻您潜在的内存消耗问题。
//根据新信息进行编辑。
我无法提供确切的答案,但我可以帮助您了解适用于大部分代码的方法:
执行 groupByKey 后,数据将被分区,这样单个键的每个实例都在同一台机器上。现在我们知道单个 "u" 的所有 "p's" 都在同一台机器上,我们可以开始操作这些值了。
让我们以我的初始代码为起点,但稍微修改一下。
rdd_1 = yourRdd.groupByKey()
rdd_2 = rdd1.mapValues(mapFunction).flatMapValues()
mapFunction 是魔法发生的地方。该函数将采用 p 的所有值的元组输入,然后输出一个元组列表,其中每个元组是一对 p 值。
#Updated based on Zero's recommendation of generators.
def mapFunction(tple):
l = list(tple)
for i in range(len(tple))
for j in l[i+1:]
yield (l[i], j)
当你到达 i=len(tple) 时,你需要添加一些东西来处理,这样我们就不会出现越界异常。您仍然可能 运行 遇到内存问题,但下一部分应该可以帮助您解决这个问题。
我认为我们可以扰乱您的执行程序配置。鉴于您的机器设置,我认为我们可以创建更大的执行程序。此外,您的应用程序没有缓存我知道的数据,因此我们可以将所有执行程序内存设置为用于我修改 memoryFraction 设置的对象。我发现一些更大的执行器比许多更小的执行器更好(尽管 运行 YARN,一直很难获得大量资源,但那是完全不同的话题)。尝试使用 16 到 32gb 的执行器,2-5 个内核。
我会做一个类似这样的 spark sumbit:
spark-submit --master yarn-client --driver-memory 4g --executor-memory 16g --num-executors 30 --executor-cores 4 --conf spark.storage.memoryFraction=0
如果您在任何地方 caching/persisting 数据,请跳过 memoryFraction 设置
您可以尝试的一件事是将计算移动到 DataFrame
:
from pyspark.sql.functions import col
rdd = ...
df = rdd.toDF(["u", "p"])
xs = df.alias("xs")
ys = df.alias("ys")
result = (xs
.join(ys, (col("xs.u") == col("ys.u")) & (col("xs.p") < col("ys.p")))
.groupBy(col("xs.p"), col("ys.p"))
.count())
虽然我不是特别乐观。如果您想要一个确切的答案,那么必须以一种或另一种方式打乱数据。