用于对称运算的 spark 中的笛卡尔上三角:`x*(x+1)//2` 而不是 `x**2`
Upper triangle of cartesian in spark for symmetric operations: `x*(x+1)//2` instead of `x**2`
我需要为 Spark 中的列表项计算成对对称分数。 IE。 score(x[i],x[j]) = score(x[j], x[i])
。一种解决方案是使用 x.cartesian(x)
。但是它将执行 x**2
操作,而不是最少的 x*(x+1)//2
操作。
在 Spark 中解决这个问题最有效的方法是什么?
PS。在纯 Python 中,我会像这样使用迭代器:
class uptrsq_range(object):
def __init__(self, n):
self._n_ = n
self._length = n*(n+1) // 2
def __iter__(self):
for ii in range(self._n_):
for jj in range(ii+1):
yield (ii,jj)
def __len__(self):
"""
recepe by sleblanc @ Whosebug
"""
"This method returns the total number of elements"
if self._length:
return self._length
else:
raise NotImplementedError("Infinite sequence has no length")
# or simply return None / 0 depending
# on implementation
for i,j in uptrsq_range(len(x)):
score(x[i], x[j])
最通用的方法是在 cartesian
之后加上 filter
。例如:
rdd = sc.parallelize(range(10))
pairs = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])
pairs.count()
## 45
如果RDD比较小你可以收集、广播和flatMap
:
xs = sc.broadcast(rdd.collect())
pairs = rdd.flatMap(lambda y: [(x, y) for x in xs.value if x < y])
pairs.count()
## 45
如果可以在 flatMap
中进一步过滤数据以减少生成值的数量,这将特别有用。
如果数据太大而无法收集/存储在内存中但可以轻松计算(如数字范围)或可以从工作人员(本地可访问的数据库)有效地访问,您可以 flatMap
作为以上或使用 mapPartitions
例如这样:
def some_function(iter):
import sqlite3
conn = sqlite3.connect('example.db')
c = conn.cursor()
query = ...
for x in iter:
# fetch some data from a database
c.execute(query, (x, ))
for y in c.fetchall():
yield (x, y)
rdd.mapPartitions(some_function)
我需要为 Spark 中的列表项计算成对对称分数。 IE。 score(x[i],x[j]) = score(x[j], x[i])
。一种解决方案是使用 x.cartesian(x)
。但是它将执行 x**2
操作,而不是最少的 x*(x+1)//2
操作。
在 Spark 中解决这个问题最有效的方法是什么?
PS。在纯 Python 中,我会像这样使用迭代器:
class uptrsq_range(object):
def __init__(self, n):
self._n_ = n
self._length = n*(n+1) // 2
def __iter__(self):
for ii in range(self._n_):
for jj in range(ii+1):
yield (ii,jj)
def __len__(self):
"""
recepe by sleblanc @ Whosebug
"""
"This method returns the total number of elements"
if self._length:
return self._length
else:
raise NotImplementedError("Infinite sequence has no length")
# or simply return None / 0 depending
# on implementation
for i,j in uptrsq_range(len(x)):
score(x[i], x[j])
最通用的方法是在 cartesian
之后加上 filter
。例如:
rdd = sc.parallelize(range(10))
pairs = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])
pairs.count()
## 45
如果RDD比较小你可以收集、广播和flatMap
:
xs = sc.broadcast(rdd.collect())
pairs = rdd.flatMap(lambda y: [(x, y) for x in xs.value if x < y])
pairs.count()
## 45
如果可以在 flatMap
中进一步过滤数据以减少生成值的数量,这将特别有用。
如果数据太大而无法收集/存储在内存中但可以轻松计算(如数字范围)或可以从工作人员(本地可访问的数据库)有效地访问,您可以 flatMap
作为以上或使用 mapPartitions
例如这样:
def some_function(iter):
import sqlite3
conn = sqlite3.connect('example.db')
c = conn.cursor()
query = ...
for x in iter:
# fetch some data from a database
c.execute(query, (x, ))
for y in c.fetchall():
yield (x, y)
rdd.mapPartitions(some_function)