Spark 中笛卡尔的替代品?
Alternatives to cartesian in Spark?
我有一个这种类型的 RDD:
[(1, [3, 10, 11]), (2, [3, 4, 10, 11]), (3, [1, 4]), (4, [2, 3, 10])...]
我需要一个遵循这个规则的函数:
如果键x
在其值列表中不包含键y
(反之亦然),则输出一个元组具有以下语法:
[(x, [y, len(values_x ^ values_y)]), ...]
其中 len(values_x ^ values_y)
是两个键之间共有的值的数量。如果此值为 0
(即没有共同的值),则跳过这对键。
例如,从上面的示例中,输出应该是:
(1, [2, 3]) # because keys 1 and 2 share the values 3, 10, 11
(1, [4, 2]) # because keys 1 and 4 share the values 3, 10
skipping: (2, [1, 3]) is the inverse of (1, [2, 3]), so it can be skipped
(2, [3, 1]) # because keys 2 and 3 share the value 4
...
键 1
和 3
对(以及其他类似情况)被跳过,因为键 3
包含在键 1
的列表值中,并且反之亦然。
我实施的一个解决方案(但我一点也不喜欢)是使用 cartesian
函数创建键之间的所有组合,然后使用映射和过滤来删除不必要的对.
不使用cartesian
有没有更好的解决方案?
首先让我们定义一些助手:
def swap(x):
"""Given a tuple (x1, x2) return (x2, 1)"""
return (x[1], 1)
def filter_source(x):
"""Check if s1 < s2 in (x, (s1, s2))"""
return x[1][0] < x[1][1]
def reshape(kv):
"""Reshape ((k1, k2), v) to get final result"""
((k1, k2), v) = kv
return (k1, (k2, v))
并创建一个示例 RDD:
rdd = sc.parallelize([
(1, [3, 10, 11]), (2, [3, 4, 10, 11]),
(3, [1, 4]), (4, [2, 3, 10])])
最后你可以做这样的事情了:
from operator import add
flattened = rdd.flatMap(lambda kv: ((v, kv[0]) for v in kv[1])) # Flatten input
flattened.first()
# (1, 3) <- from (3, [1, 4])
result = (flattened
.join(flattened) # Perform self join using value from input as key
.filter(filter_source) # Remove pairs from the same source
.map(swap)
.reduceByKey(add)
.map(reshape)) # Get final output
我有一个这种类型的 RDD:
[(1, [3, 10, 11]), (2, [3, 4, 10, 11]), (3, [1, 4]), (4, [2, 3, 10])...]
我需要一个遵循这个规则的函数:
如果键x
在其值列表中不包含键y
(反之亦然),则输出一个元组具有以下语法:
[(x, [y, len(values_x ^ values_y)]), ...]
其中 len(values_x ^ values_y)
是两个键之间共有的值的数量。如果此值为 0
(即没有共同的值),则跳过这对键。
例如,从上面的示例中,输出应该是:
(1, [2, 3]) # because keys 1 and 2 share the values 3, 10, 11
(1, [4, 2]) # because keys 1 and 4 share the values 3, 10
skipping: (2, [1, 3]) is the inverse of (1, [2, 3]), so it can be skipped
(2, [3, 1]) # because keys 2 and 3 share the value 4
...
键 1
和 3
对(以及其他类似情况)被跳过,因为键 3
包含在键 1
的列表值中,并且反之亦然。
我实施的一个解决方案(但我一点也不喜欢)是使用 cartesian
函数创建键之间的所有组合,然后使用映射和过滤来删除不必要的对.
不使用cartesian
有没有更好的解决方案?
首先让我们定义一些助手:
def swap(x):
"""Given a tuple (x1, x2) return (x2, 1)"""
return (x[1], 1)
def filter_source(x):
"""Check if s1 < s2 in (x, (s1, s2))"""
return x[1][0] < x[1][1]
def reshape(kv):
"""Reshape ((k1, k2), v) to get final result"""
((k1, k2), v) = kv
return (k1, (k2, v))
并创建一个示例 RDD:
rdd = sc.parallelize([
(1, [3, 10, 11]), (2, [3, 4, 10, 11]),
(3, [1, 4]), (4, [2, 3, 10])])
最后你可以做这样的事情了:
from operator import add
flattened = rdd.flatMap(lambda kv: ((v, kv[0]) for v in kv[1])) # Flatten input
flattened.first()
# (1, 3) <- from (3, [1, 4])
result = (flattened
.join(flattened) # Perform self join using value from input as key
.filter(filter_source) # Remove pairs from the same source
.map(swap)
.reduceByKey(add)
.map(reshape)) # Get final output