按值过滤 RDD PySpark
Filter RDD by values PySpark
我正在使用 PySpark,我正在寻找一种方法来检查:
对于给定的 check_number = 01
如果我的rdd1
中第三个元素的值不包含check_number
==> 从 rdd2
..
获取有关此 check_number 的所有信息
给定:
rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=',
u'serviceXXX',
u'testAB_02',
u'2016-07-03')])
假设第一个元素是ID
,第二个是服务名,第三个是测试名,带ID
,第四个是日期
rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02'),
(u'XXXX52547412558933nnBlmquhdyhM',
u'02',
u'2016-11-04')])
假设第一个元素是一个ID,第二个元素是一个测试id,最后一个元素是一个日期。
因此,我的 rdd1
testAB_02
与我的 check_number 不匹配(因此服务名称必须以 check_number 的值结尾).我的目标是从 rdd2
获取所有行,并将 01
作为测试 ID。这里的预期输出必须是:
[(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02')
这是我的代码:
def update_typesdecohorte_table(rdd1, rdd2):
if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True:
new_rdd2 = rdd2.filter(lambda x : x[1] == check_number)
else:
pass
return new_rdd2
new_rdd2 = update_typesdecohorte_table(rdd1, rdd2)
至于给出:
[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')]
这段代码有效,但我不喜欢这种方法。最有效的方法是什么?
如果你想从 rdd2 中获取所有在 rdd1 中没有匹配元素的记录,你可以使用 cartesian
:
new_rdd2 = rdd1.cartesian(rdd2)
.filter(lambda r: not r[0][2].endswith(r[1][1]))
.map(lambda r: r[1])
如果你的 check_number 是固定的,最后按这个值过滤:
new_rdd2.filter(lambda r: r[1] == check_number).collect()
但是,如果您的 check_number 是固定的并且两个 RDD 都很大,那么它可能比您的解决方案更慢,因为它需要在连接期间对分区进行改组(您的代码仅执行非改组转换)。
我正在使用 PySpark,我正在寻找一种方法来检查:
对于给定的 check_number = 01
如果我的rdd1
中第三个元素的值不包含check_number
==> 从 rdd2
..
给定:
rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=',
u'serviceXXX',
u'testAB_02',
u'2016-07-03')])
假设第一个元素是ID
,第二个是服务名,第三个是测试名,带ID
,第四个是日期
rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02'),
(u'XXXX52547412558933nnBlmquhdyhM',
u'02',
u'2016-11-04')])
假设第一个元素是一个ID,第二个元素是一个测试id,最后一个元素是一个日期。
因此,我的 rdd1
testAB_02
与我的 check_number 不匹配(因此服务名称必须以 check_number 的值结尾).我的目标是从 rdd2
获取所有行,并将 01
作为测试 ID。这里的预期输出必须是:
[(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02')
这是我的代码:
def update_typesdecohorte_table(rdd1, rdd2):
if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True:
new_rdd2 = rdd2.filter(lambda x : x[1] == check_number)
else:
pass
return new_rdd2
new_rdd2 = update_typesdecohorte_table(rdd1, rdd2)
至于给出:
[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')]
这段代码有效,但我不喜欢这种方法。最有效的方法是什么?
如果你想从 rdd2 中获取所有在 rdd1 中没有匹配元素的记录,你可以使用 cartesian
:
new_rdd2 = rdd1.cartesian(rdd2)
.filter(lambda r: not r[0][2].endswith(r[1][1]))
.map(lambda r: r[1])
如果你的 check_number 是固定的,最后按这个值过滤:
new_rdd2.filter(lambda r: r[1] == check_number).collect()
但是,如果您的 check_number 是固定的并且两个 RDD 都很大,那么它可能比您的解决方案更慢,因为它需要在连接期间对分区进行改组(您的代码仅执行非改组转换)。