如何有效地将新密钥添加到 pyspark 中的 RDD
How to efficiently add a new key to an RDD in pyspark
我有两种 RDD 格式,第一种是 ((provider, currency), value)
,其中键是 (provider, currency)
,第二种是 (provider, value)
,其中键是 provider
。
我想做的是将 A 从 (provider, value)
格式转换为 ((provider, currency), value)
格式。我有一个 B ((provider, currency), value)
RDD,我会在那里拿钥匙。然后,我将使用这些键来扩展 RDD A,这样 (provider, value)
RDD 中的每个 value
都会对新的 currency
中的每个 currency
重复自身((provider, currency), value)
RDD.
如何以高效的方式完成这项工作,而不必收集 () RDD 并循环遍历它们?
例如:
来自 RDD A 的项目将是:
(1773570, 4135.7998046875)
那么来自 RDD B 的一些键将是
[(1773570, 'EUR/USD'), (1773570, 'GBP/USD'), (1773570, 'USD/CAD')]
输出RDD应该是:
[((1773570, 'EUR/USD'), 4135.7998046875), ((1773570, 'GBP/USD'), 4135.7998046875), ((1773570, 'USD/CAD'), 4135.7998046875)]
一个可能的解决方案是:
def get_keys(rdd):
return rdd.map(lambda item: (item[0])).collect()
def canonicalize_keys(sc, feature, keys):
def transform(item, keys):
return [
((item[0], currency_pair), item[1])
for provider_id, currency_pair in keys
if provider_id == item[0]]
return sc.parallelize(feature
.map(lambda item: transform(item, keys))
.reduce(lambda a, b: a + b))
在这里,我使用 get_keys
从 RDD B 获取密钥,然后我使用这些密钥转换 RDD A。这里的问题是,如果我有很多 currency_pairs 我会从 JVM 得到 OutOfMemoryErrors。
试试这个:
给定 Ardd = RDD[(provider, value)]
和 Brdd = RDD[((provider, currency), value)]
,您要做的是连接 Ardd
和 Brdd
,这样 newRDD
的形式就是 RDD[((provider, currency), value)]
。其中 value
是指从 Ardd
.
中找到的值
为此,我们所做的是:
一行解法:
newRDD = Ardd.join(Brdd.map(lambda x: x[0])).map(lambda x: ((x[0], x[1][1]), x[1][0]))
逐步说明:
从 Brdd
获取密钥:Brdd_keys = Brdd.map(lambda x: x[0])
。输出具有以下形式:RDD[(provider, currency)]
加入 Ardd 和 Brdd_keys:AB = Ardd.join(Brdd_keys)
。输出具有以下形式:RDD[(provider, (value, currency))]
映射到最终形式:newRDD = AB.map(lambda x: ((x[0], x[1][1]), x[1][0]))
。输出现在具有 RDD[((provider, currency), value)]
的形式
我有两种 RDD 格式,第一种是 ((provider, currency), value)
,其中键是 (provider, currency)
,第二种是 (provider, value)
,其中键是 provider
。
我想做的是将 A 从 (provider, value)
格式转换为 ((provider, currency), value)
格式。我有一个 B ((provider, currency), value)
RDD,我会在那里拿钥匙。然后,我将使用这些键来扩展 RDD A,这样 (provider, value)
RDD 中的每个 value
都会对新的 currency
中的每个 currency
重复自身((provider, currency), value)
RDD.
如何以高效的方式完成这项工作,而不必收集 () RDD 并循环遍历它们?
例如:
来自 RDD A 的项目将是:
(1773570, 4135.7998046875)
那么来自 RDD B 的一些键将是
[(1773570, 'EUR/USD'), (1773570, 'GBP/USD'), (1773570, 'USD/CAD')]
输出RDD应该是:
[((1773570, 'EUR/USD'), 4135.7998046875), ((1773570, 'GBP/USD'), 4135.7998046875), ((1773570, 'USD/CAD'), 4135.7998046875)]
一个可能的解决方案是:
def get_keys(rdd):
return rdd.map(lambda item: (item[0])).collect()
def canonicalize_keys(sc, feature, keys):
def transform(item, keys):
return [
((item[0], currency_pair), item[1])
for provider_id, currency_pair in keys
if provider_id == item[0]]
return sc.parallelize(feature
.map(lambda item: transform(item, keys))
.reduce(lambda a, b: a + b))
在这里,我使用 get_keys
从 RDD B 获取密钥,然后我使用这些密钥转换 RDD A。这里的问题是,如果我有很多 currency_pairs 我会从 JVM 得到 OutOfMemoryErrors。
试试这个:
给定 Ardd = RDD[(provider, value)]
和 Brdd = RDD[((provider, currency), value)]
,您要做的是连接 Ardd
和 Brdd
,这样 newRDD
的形式就是 RDD[((provider, currency), value)]
。其中 value
是指从 Ardd
.
为此,我们所做的是:
一行解法:
newRDD = Ardd.join(Brdd.map(lambda x: x[0])).map(lambda x: ((x[0], x[1][1]), x[1][0]))
逐步说明:
从
Brdd
获取密钥:Brdd_keys = Brdd.map(lambda x: x[0])
。输出具有以下形式:RDD[(provider, currency)]
加入 Ardd 和 Brdd_keys:
AB = Ardd.join(Brdd_keys)
。输出具有以下形式:RDD[(provider, (value, currency))]
映射到最终形式:
newRDD = AB.map(lambda x: ((x[0], x[1][1]), x[1][0]))
。输出现在具有RDD[((provider, currency), value)]
的形式