在自定义函数上加入两个 RDD - SPARK
Join two RDDs on custom function - SPARK
是否可以通过自定义函数在 Spark 中连接两个 RDD?
我有两个以字符串作为键的大 RDD。我想加入他们,而不是使用经典的加入,而是使用自定义函数,如:
def my_func(a,b):
return Lev.distance(a,b) < 2
result_rdd = rdd1.join(rdd2, my_func)
如果不可能,是否有任何替代方案可以继续利用 spark 集群的优势?
我写了类似这样的东西,但是 pyspark 将无法在我的小集群上分发工作。
def custom_join(rdd1, rdd2, my_func):
a = rdd1.sortByKey().collect()
b = rdd2.sortByKey().collect()
i = 0
j = 0
res = []
while i < len(a) and j < len(b):
if my_func(a[i][0],b[j][0]):
res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))]
i+=1
j+=1
elif a[i][0] < b[j][0]:
i+=1
else:
j+=1
return sc.parallelize(res)
提前致谢(抱歉我的英语不好,因为我是意大利人)
您可以使用笛卡尔坐标,然后根据条件进行过滤。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("b", 3)])
def customFunc(x):
# You may use any condition here
return x[0][0] ==x[1][0]
print(x.join(y).collect()) # normal join
# replicating join with cartesian
print(x.cartesian(y).filter(customFunc).flatMap(lambda x:x).groupByKey().mapValues(tuple).collect())
输出:
[('b', (4, 3)), ('a', (1, 2))]
[('a', (1, 2)), ('b', (4, 3))]
是否可以通过自定义函数在 Spark 中连接两个 RDD? 我有两个以字符串作为键的大 RDD。我想加入他们,而不是使用经典的加入,而是使用自定义函数,如:
def my_func(a,b):
return Lev.distance(a,b) < 2
result_rdd = rdd1.join(rdd2, my_func)
如果不可能,是否有任何替代方案可以继续利用 spark 集群的优势? 我写了类似这样的东西,但是 pyspark 将无法在我的小集群上分发工作。
def custom_join(rdd1, rdd2, my_func):
a = rdd1.sortByKey().collect()
b = rdd2.sortByKey().collect()
i = 0
j = 0
res = []
while i < len(a) and j < len(b):
if my_func(a[i][0],b[j][0]):
res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))]
i+=1
j+=1
elif a[i][0] < b[j][0]:
i+=1
else:
j+=1
return sc.parallelize(res)
提前致谢(抱歉我的英语不好,因为我是意大利人)
您可以使用笛卡尔坐标,然后根据条件进行过滤。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("b", 3)])
def customFunc(x):
# You may use any condition here
return x[0][0] ==x[1][0]
print(x.join(y).collect()) # normal join
# replicating join with cartesian
print(x.cartesian(y).filter(customFunc).flatMap(lambda x:x).groupByKey().mapValues(tuple).collect())
输出:
[('b', (4, 3)), ('a', (1, 2))]
[('a', (1, 2)), ('b', (4, 3))]