如何将 spark RDD 与 JdbcRDD 的结果相匹配

how to match a spark RDD with results from JdbcRDD

我有一个 mydata:RDD[顶点],其中每个顶点都有一个 ID 属性。我在 dbdata:JdbcRDD[String] 中也有来自 mysql 的数据,其中每个字符串中的第一个位置由一个 id 值占据,该 id 值与 RDD 集合中 Vertex.id 中的值匹配。

我想用来自 JdbcRDD[String] 的数据填充每个 Vertex 的变量,但我没有更有效的方法来匹配两个数据集。我正在尝试这个,但它不起作用:

mydata.map(x => val arow = dbdata.filter { y => y.split(",")(0).toString == x.id }.collect
           x.prop1 = arow(1)
           // ... for all other values in arow
)

然而,我得到了

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; 

所以我猜我的方法是错误的...

您的问题听起来很适合连接操作。下面的代码片段应该可以解决问题

val mydata: RDD[Vertex] = ...
val dbdata: JdbcRDD[String] = ...

val mydataKV = mydata.map(x => (x.id, x))
val dbdataKV = dbdata.map(x => ((x.split(",")(0), x)))

val result = mydataKV.join(dbdataKV).mapValues{
  case (vertex, db) =>
    // fill vertex fields
    vertex.prop1 = db.split(",")(1)
    ...

    vertex
}