如何将 spark RDD 与 JdbcRDD 的结果相匹配
how to match a spark RDD with results from JdbcRDD
我有一个 mydata:RDD[顶点],其中每个顶点都有一个 ID 属性。我在 dbdata:JdbcRDD[String] 中也有来自 mysql 的数据,其中每个字符串中的第一个位置由一个 id 值占据,该 id 值与 RDD 集合中 Vertex.id 中的值匹配。
我想用来自 JdbcRDD[String] 的数据填充每个 Vertex 的变量,但我没有更有效的方法来匹配两个数据集。我正在尝试这个,但它不起作用:
mydata.map(x => val arow = dbdata.filter { y => y.split(",")(0).toString == x.id }.collect
x.prop1 = arow(1)
// ... for all other values in arow
)
然而,我得到了
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations;
所以我猜我的方法是错误的...
您的问题听起来很适合连接操作。下面的代码片段应该可以解决问题
val mydata: RDD[Vertex] = ...
val dbdata: JdbcRDD[String] = ...
val mydataKV = mydata.map(x => (x.id, x))
val dbdataKV = dbdata.map(x => ((x.split(",")(0), x)))
val result = mydataKV.join(dbdataKV).mapValues{
case (vertex, db) =>
// fill vertex fields
vertex.prop1 = db.split(",")(1)
...
vertex
}
我有一个 mydata:RDD[顶点],其中每个顶点都有一个 ID 属性。我在 dbdata:JdbcRDD[String] 中也有来自 mysql 的数据,其中每个字符串中的第一个位置由一个 id 值占据,该 id 值与 RDD 集合中 Vertex.id 中的值匹配。
我想用来自 JdbcRDD[String] 的数据填充每个 Vertex 的变量,但我没有更有效的方法来匹配两个数据集。我正在尝试这个,但它不起作用:
mydata.map(x => val arow = dbdata.filter { y => y.split(",")(0).toString == x.id }.collect
x.prop1 = arow(1)
// ... for all other values in arow
)
然而,我得到了
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations;
所以我猜我的方法是错误的...
您的问题听起来很适合连接操作。下面的代码片段应该可以解决问题
val mydata: RDD[Vertex] = ...
val dbdata: JdbcRDD[String] = ...
val mydataKV = mydata.map(x => (x.id, x))
val dbdataKV = dbdata.map(x => ((x.split(",")(0), x)))
val result = mydataKV.join(dbdataKV).mapValues{
case (vertex, db) =>
// fill vertex fields
vertex.prop1 = db.split(",")(1)
...
vertex
}