与 Apache Spark (Pyspark) 的组合和质量集交叉点

Combinations And Mass Set Intersections With Apache Spark (Pyspark)

挑战: 数据是 [(u, p), (u, p), ...] 形状的 RDD,其中 u 和 p 都是字符串。我具有挑战性的期望输出是 [((p,p), u), ..],其中每个元素中的键是共享 u 的所有 p 的组合。

已经尝试过:

  1. 使用笛卡尔和滤波器模拟组合。
  2. 尝试对每个元素使用 Python 中的 itertools.combinations,然后使用 flatMap,就像这样:[(tuple(sorted(e)), x[0]) for e in combinations(x[1].split(','),2)])

我已经尝试过的问题是执行程序节点似乎出现故障,可能是由于内存消耗。

有什么建议吗?

编辑

以下是问题的更多信息和背景:

我正在寻找每两个 "p" 交集的重叠 count/cardinality,其中 "p" 是集合 ID,"u" 是成员的一个集合。

输入是一个巨大的"p"和"u"之间的关系记录列表,例如: [(u1,p1), (u2, p2), (u1, p2), (u2, p1), (u1, p3)]

期望的输出是:[((p1,p2), 2), ((p1, p3), 1), ((p2, p3), 1)] (注意组合而不是排列)

关于输入数据的更多细节:

  1. 有 50k - 100k 个不同的 "p"s。
  2. 一个"p"可以有几亿个唯一的"u"。
  3. 可以有重复的输入数据行。换句话说,(u,p) 的多个实例,但是在将 "u" 的列表视为交集步骤的给定 "p" 的集合时,这并没有改变问题。

至于硬件,我在 EMR(Yarn 上的 Spark 1.3.1)上使用了大约 41 m3.xlarges 并以这种方式关闭 shell: ./pyspark --master yarn-client --driver-memory 4G --executor-memory 3G --num-executors 160

yourRdd.groupByKey().map(lambda (a,b): (b,a))

group by key会将"u"的所有值聚合成

[(u1,(p1,p2,p3..,px)),(ux,(p1,...,px))]

地图将交换键和值,这应该会提供所需的输出。当没有足够的内存可用时,groupByKey 会溢出到磁盘,因此这也应该可以减轻您潜在的内存消耗问题。

//根据新信息进行编辑。

我无法提供确切的答案,但我可以帮助您了解适用于大部分代码的方法:

执行 groupByKey 后,数据将被分区,这样单个键的每个实例都在同一台机器上。现在我们知道单个 "u" 的所有 "p's" 都在同一台机器上,我们可以开始操作这些值了。

让我们以我的初始代码为起点,但稍微修改一下。

rdd_1 = yourRdd.groupByKey()
rdd_2 = rdd1.mapValues(mapFunction).flatMapValues()

mapFunction 是魔法发生的地方。该函数将采用 p 的所有值的元组输入,然后输出一个元组列表,其中每个元组是一对 p 值。

#Updated based on Zero's recommendation of generators.
def mapFunction(tple):
    l = list(tple)
    for i in range(len(tple))
        for j in l[i+1:]
             yield (l[i], j)

当你到达 i=len(tple) 时,你需要添加一些东西来处理,这样我们就不会出现越界异常。您仍然可能 运行 遇到内存问题,但下一部分应该可以帮助您解决这个问题。

我认为我们可以扰乱您的执行程序配置。鉴于您的机器设置,我认为我们可以创建更大的执行程序。此外,您的应用程序没有缓存我知道的数据,因此我们可以将所有执行程序内存设置为用于我修改 memoryFraction 设置的对象。我发现一些更大的执行器比许多更小的执行器更好(尽管 运行 YARN,一直很难获得大量资源,但那是完全不同的话题)。尝试使用 16 到 32gb 的执行器,2-5 个内核。

我会做一个类似这样的 spark sumbit:

spark-submit --master yarn-client --driver-memory 4g --executor-memory 16g --num-executors 30 --executor-cores 4  --conf spark.storage.memoryFraction=0

如果您在任何地方 caching/persisting 数据,请跳过 memoryFraction 设置

您可以尝试的一件事是将计算移动到 DataFrame:

from pyspark.sql.functions import col

rdd = ...
df = rdd.toDF(["u", "p"])

xs = df.alias("xs")
ys = df.alias("ys")

result = (xs
    .join(ys, (col("xs.u") == col("ys.u")) & (col("xs.p") < col("ys.p")))
    .groupBy(col("xs.p"), col("ys.p"))
    .count())

虽然我不是特别乐观。如果您想要一个确切的答案,那么必须以一种或另一种方式打乱数据。