Spark 中笛卡尔的替代品?

Alternatives to cartesian in Spark?

我有一个这种类型的 RDD:

[(1, [3, 10, 11]), (2, [3, 4, 10, 11]), (3, [1, 4]), (4, [2, 3, 10])...]

我需要一个遵循这个规则的函数:

如果键x在其值列表中不包含键y(反之亦然),则输出一个元组具有以下语法:

[(x, [y, len(values_x ^ values_y)]), ...]

其中 len(values_x ^ values_y) 是两个键之间共有的值的数量。如果此值为 0(即没有共同的值),则跳过这对键。

例如,从上面的示例中,输出应该是:

(1, [2, 3]) # because keys 1 and 2 share the values 3, 10, 11
(1, [4, 2]) # because keys 1 and 4 share the values 3, 10
skipping: (2, [1, 3]) is the inverse of (1, [2, 3]), so it can be skipped
(2, [3, 1]) # because keys 2 and 3 share the value 4
...

13 对(以及其他类似情况)被跳过,因为键 3 包含在键 1 的列表值中,并且反之亦然。

我实施的一个解决方案(但我一点也不喜欢)是使用 cartesian 函数创建键之间的所有组合,然后使用映射和过滤来删除不必要的对.

不使用cartesian有没有更好的解决方案?

首先让我们定义一些助手:

def swap(x):
    """Given a tuple (x1, x2) return (x2, 1)"""
    return (x[1], 1)

def filter_source(x):
    """Check if s1 < s2 in (x, (s1, s2))"""
    return x[1][0] < x[1][1]

def reshape(kv):
    """Reshape ((k1, k2), v) to get final result"""
    ((k1, k2), v) = kv
    return (k1, (k2, v))

并创建一个示例 RDD:

rdd = sc.parallelize([
    (1, [3, 10, 11]), (2, [3, 4, 10, 11]),
    (3, [1, 4]), (4, [2, 3, 10])])

最后你可以做这样的事情了:

from operator import add

flattened = rdd.flatMap(lambda kv: ((v, kv[0]) for v in kv[1])) # Flatten input
flattened.first()
# (1, 3) <- from (3, [1, 4])

result = (flattened 
    .join(flattened) # Perform self join using value from input as key
    .filter(filter_source) # Remove pairs from the same source
    .map(swap)
    .reduceByKey(add)
    .map(reshape)) # Get final output