如何将查找(广播)RDD(或数据集)访问到其他 RDD 映射函数

How to access lookup(broadcast) RDD(or dataset) into other RDD map function

我是 spark 和 scala 的新手,刚开始学习...我在 CDH 5.1.3 上使用 spark 1.0.0

我得到一个名为 dbTableKeyValueMap 的广播 rdd:RDD[(String, String)],我想使用 dbTableKeyValueMap 来处理我的 fileRDD(每行有 300 多列)。这是代码:

val get = fileRDD.map({x =>
  val tmp = dbTableKeyValueMap.lookup(x)
  tmp
})

运行 这在本地挂起 and/or 有时会出现错误:

scala.MatchError: null
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)

我可以理解访问一个 RDD 内部会有问题,如果集合的位置和大小出现问题。对我来说,笛卡尔积不是选项,因为文件 RDD 中的记录很大(每行有 300 多列) ...就像我使用分布式缓存在设置方法中加载此 dbTableKeyValueMap 并在 hadoop java mapreduce 代码的 MAP 中使用一样,我想在 spark map 中使用类似的方式...我无法找到简单的引用类似用例的示例... 我想一个接一个地遍历 fileRDD 行并在 "each column" 上做一些转换、祝福、查找等以进行进一步处理...... 或者还有其他方法可以使用 dbTableKeyValueMap 作为 scala 集合而不是 spark RDD

请帮忙

I can understand accessing one RDD inside other will have issues, if locality and size of collection come into picture

不是真的。它根本行不通。 Spark 不支持嵌套操作和转换。这意味着不能使用广播的 RDD 来访问数据。

通常您有以下三种选择:

  • 收集RDD并广播局部变量(参见:Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?
  • 使用join(这里好像是你需要的):

    fileRDD.map(x => (x, null)).join(fileRDD)
    
  • 使用所有工作人员都可以访问的外部存储

谢谢....最简单的事情就是将查找 RDD 转换为 "scala collection" 并且很好!我可以在任何 RDD 的转换中访问它....

val scalaMap = dbTableKeyValueMap.collectAsMap.toMap
val broadCastLookupMap = sc.broadcast(scalaMap)

val get = fileRDD.map({x =>
  val tmp = broadCastLookupMap.value.get(x).head
  tmp
})

这个简单的解决方案应该记录在某个地方供早期学习者使用。我花了一段时间才弄明白...

感谢您的帮助...