如何有效地将新密钥添加到 pyspark 中的 RDD

How to efficiently add a new key to an RDD in pyspark

我有两种 RDD 格式,第一种是 ((provider, currency), value),其中键是 (provider, currency),第二种是 (provider, value),其中键是 provider

我想做的是将 A(provider, value) 格式转换为 ((provider, currency), value) 格式。我有一个 B ((provider, currency), value) RDD,我会在那里拿钥匙。然后,我将使用这些键来扩展 RDD A,这样 (provider, value) RDD 中的每个 value 都会对新的 currency 中的每个 currency 重复自身((provider, currency), value) RDD.

如何以高效的方式完成这项工作,而不必收集 () RDD 并循环遍历它们?

例如:

来自 RDD A 的项目将是:

(1773570, 4135.7998046875)

那么来自 RDD B 的一些键将是

[(1773570, 'EUR/USD'), (1773570, 'GBP/USD'), (1773570, 'USD/CAD')]

输出RDD应该是:

[((1773570, 'EUR/USD'), 4135.7998046875), ((1773570, 'GBP/USD'), 4135.7998046875), ((1773570, 'USD/CAD'), 4135.7998046875)]

一个可能的解决方案是:

def get_keys(rdd):
    return rdd.map(lambda item: (item[0])).collect()

def canonicalize_keys(sc, feature, keys):
    def transform(item, keys):
        return [
            ((item[0], currency_pair), item[1])
                for provider_id, currency_pair in keys
                    if provider_id == item[0]]
    return sc.parallelize(feature
        .map(lambda item: transform(item, keys))
        .reduce(lambda a, b: a + b))

在这里,我使用 get_keys 从 RDD B 获取密钥,然后我使用这些密钥转换 RDD A。这里的问题是,如果我有很多 currency_pairs 我会从 JVM 得到 OutOfMemoryErrors。

试试这个: 给定 Ardd = RDD[(provider, value)]Brdd = RDD[((provider, currency), value)],您要做的是连接 ArddBrdd,这样 newRDD 的形式就是 RDD[((provider, currency), value)]。其中 value 是指从 Ardd.

中找到的值

为此,我们所做的是:

一行解法:

newRDD = Ardd.join(Brdd.map(lambda x: x[0])).map(lambda x: ((x[0], x[1][1]), x[1][0]))

逐步说明:

  1. Brdd 获取密钥:Brdd_keys = Brdd.map(lambda x: x[0])。输出具有以下形式:RDD[(provider, currency)]

  2. 加入 Ardd 和 Brdd_keys:AB = Ardd.join(Brdd_keys)。输出具有以下形式:RDD[(provider, (value, currency))]

  3. 映射到最终形式:newRDD = AB.map(lambda x: ((x[0], x[1][1]), x[1][0]))。输出现在具有 RDD[((provider, currency), value)]

  4. 的形式