如何用 RDD 类型 [String, Int] 的值替换 [String] 的 RDD 类型

How to replace RDD type of [String] with values of RDD type [String, Int]

对于最初问题的混淆,我们深表歉意。这是一个带有可重现示例的问题:

我的 rdd 为 [String],我的 rdd 为 [String, Long]。我想根据第二个 String 与第一个 String 的匹配得到 [Long] 的 rdd。示例:

//Create RDD
val textFile = sc.parallelize(Array("Spark can also be used for compute intensive tasks",
      "This code estimates pi by throwing darts at a circle"))
// tokenize, result: RDD[(String)]
val words = textFile.flatMap(line => line.split(" "))
// create index of distinct words, result:  RDD[(String,Long)]
val indexWords = words.distinct().zipWithIndex()

因此,我想要一个带有单词索引的 RDD 而不是 "Spark can also be used for compute intensive tasks" 中的单词。

再次抱歉,谢谢

如果我没理解错的话,您对 Spark can also be used for compute intensive tasks 中也出现的作品索引感兴趣。

如果是这样 - 这里有两个输出相同但性能特征不同的版本:

val lookupWords: Seq[String] = "Spark can also be used for compute intensive tasks".split(" ")

// option 1 - use join:
val lookupWordsRdd: RDD[(String, String)] = sc.parallelize(lookupWords).keyBy(w => w)
val result1: RDD[Long] = indexWords.join(lookupWordsRdd).map { case (key, (index, _)) => index }

// option 2 - assuming list of lookup words is short, you can use a non-distributed version of it
val result2: RDD[Long] = indexWords.collect { case (key, index) if lookupWords.contains(key) => index }

第一个选项用我们感兴趣的索引词创建第二个 RDD,使用 keyBy 将其转换为 PairRDD(键 == 值!),joins它与您的 indexWords RDD,然后映射以仅获取索引。

只有当 "interesting words" 的列表已知不会太大时才应使用第二个选项 - 因此我们可以将其保留为列表(而不是 RDD),并让 Spark将其序列化并发送给工作人员以供每个任务使用。然后我们使用 collect(f: PartialFunction[T, U]) 应用此部分函数立即获得 "filter" 和 "map" - 如果单词存在于列表中,我们只 return 一个值,如果所以 - 我们 return 索引。

我收到 SPARK-5063 错误并给出 ,我找到了问题的解决方案:

//broadcast `indexWords`
val bcIndexWords = sc.broadcast(indexWords.collectAsMap)
// select `value` of `indexWords` given `key`
val result = textFile.map{arr => arr.split(" ").map(elem => bcIndexWords.value(elem))}
result.first()
res373: Array[Long] = Array(3, 7, 14, 6, 17, 15, 0, 12)