如何用 RDD 类型 [String, Int] 的值替换 [String] 的 RDD 类型
How to replace RDD type of [String] with values of RDD type [String, Int]
对于最初问题的混淆,我们深表歉意。这是一个带有可重现示例的问题:
我的 rdd 为 [String]
,我的 rdd 为 [String, Long]
。我想根据第二个 String
与第一个 String
的匹配得到 [Long]
的 rdd。示例:
//Create RDD
val textFile = sc.parallelize(Array("Spark can also be used for compute intensive tasks",
"This code estimates pi by throwing darts at a circle"))
// tokenize, result: RDD[(String)]
val words = textFile.flatMap(line => line.split(" "))
// create index of distinct words, result: RDD[(String,Long)]
val indexWords = words.distinct().zipWithIndex()
因此,我想要一个带有单词索引的 RDD 而不是 "Spark can also be used for compute intensive tasks"
中的单词。
再次抱歉,谢谢
如果我没理解错的话,您对 Spark can also be used for compute intensive tasks
中也出现的作品索引感兴趣。
如果是这样 - 这里有两个输出相同但性能特征不同的版本:
val lookupWords: Seq[String] = "Spark can also be used for compute intensive tasks".split(" ")
// option 1 - use join:
val lookupWordsRdd: RDD[(String, String)] = sc.parallelize(lookupWords).keyBy(w => w)
val result1: RDD[Long] = indexWords.join(lookupWordsRdd).map { case (key, (index, _)) => index }
// option 2 - assuming list of lookup words is short, you can use a non-distributed version of it
val result2: RDD[Long] = indexWords.collect { case (key, index) if lookupWords.contains(key) => index }
第一个选项用我们感兴趣的索引词创建第二个 RDD,使用 keyBy
将其转换为 PairRDD(键 == 值!),join
s它与您的 indexWords
RDD,然后映射以仅获取索引。
只有当 "interesting words" 的列表已知不会太大时才应使用第二个选项 - 因此我们可以将其保留为列表(而不是 RDD
),并让 Spark将其序列化并发送给工作人员以供每个任务使用。然后我们使用 collect(f: PartialFunction[T, U])
应用此部分函数立即获得 "filter" 和 "map" - 如果单词存在于列表中,我们只 return 一个值,如果所以 - 我们 return 索引。
我收到 SPARK-5063 错误并给出 ,我找到了问题的解决方案:
//broadcast `indexWords`
val bcIndexWords = sc.broadcast(indexWords.collectAsMap)
// select `value` of `indexWords` given `key`
val result = textFile.map{arr => arr.split(" ").map(elem => bcIndexWords.value(elem))}
result.first()
res373: Array[Long] = Array(3, 7, 14, 6, 17, 15, 0, 12)
对于最初问题的混淆,我们深表歉意。这是一个带有可重现示例的问题:
我的 rdd 为 [String]
,我的 rdd 为 [String, Long]
。我想根据第二个 String
与第一个 String
的匹配得到 [Long]
的 rdd。示例:
//Create RDD
val textFile = sc.parallelize(Array("Spark can also be used for compute intensive tasks",
"This code estimates pi by throwing darts at a circle"))
// tokenize, result: RDD[(String)]
val words = textFile.flatMap(line => line.split(" "))
// create index of distinct words, result: RDD[(String,Long)]
val indexWords = words.distinct().zipWithIndex()
因此,我想要一个带有单词索引的 RDD 而不是 "Spark can also be used for compute intensive tasks"
中的单词。
再次抱歉,谢谢
如果我没理解错的话,您对 Spark can also be used for compute intensive tasks
中也出现的作品索引感兴趣。
如果是这样 - 这里有两个输出相同但性能特征不同的版本:
val lookupWords: Seq[String] = "Spark can also be used for compute intensive tasks".split(" ")
// option 1 - use join:
val lookupWordsRdd: RDD[(String, String)] = sc.parallelize(lookupWords).keyBy(w => w)
val result1: RDD[Long] = indexWords.join(lookupWordsRdd).map { case (key, (index, _)) => index }
// option 2 - assuming list of lookup words is short, you can use a non-distributed version of it
val result2: RDD[Long] = indexWords.collect { case (key, index) if lookupWords.contains(key) => index }
第一个选项用我们感兴趣的索引词创建第二个 RDD,使用 keyBy
将其转换为 PairRDD(键 == 值!),join
s它与您的 indexWords
RDD,然后映射以仅获取索引。
只有当 "interesting words" 的列表已知不会太大时才应使用第二个选项 - 因此我们可以将其保留为列表(而不是 RDD
),并让 Spark将其序列化并发送给工作人员以供每个任务使用。然后我们使用 collect(f: PartialFunction[T, U])
应用此部分函数立即获得 "filter" 和 "map" - 如果单词存在于列表中,我们只 return 一个值,如果所以 - 我们 return 索引。
我收到 SPARK-5063 错误并给出
//broadcast `indexWords`
val bcIndexWords = sc.broadcast(indexWords.collectAsMap)
// select `value` of `indexWords` given `key`
val result = textFile.map{arr => arr.split(" ").map(elem => bcIndexWords.value(elem))}
result.first()
res373: Array[Long] = Array(3, 7, 14, 6, 17, 15, 0, 12)