使用 Spark Java RDD 匹配数据
Matching Data using Spark Java RDD
在我最近的 BigData 项目中,我需要使用 Spark。
第一个要求如下
我们有两组来自不同数据源的数据,比如一组来自 flatFile,另一组来自 HDFS。
数据集可能有也可能没有公共列,但我们手头有映射规则,例如
函数 1(data1.columnA)==函数 2(data2.columnB)
我试图通过在另一个内部的 rdd 上执行 foreach 来实现这一点,但这在 Spark 中是不允许的,
</p>
<p>org.apache.spark.SparkException:RDD转换和动作只能被驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x) 是无效的,因为无法在 rdd1.map 转换内部执行值转换和计数操作。有关详细信息,请参阅 SPARK-5063。
在 org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
在 org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
在 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332)
在 org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46)
在 com.pramod.engine.DataMatchingEngine.lambda$execute$4e658232$1(DataMatchingEngine.java:44)
在 com.pramod.engine.DataMatchingEngine$$Lambda$9/1172080526.call(未知来源)
在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332)
在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332)
在 scala.collection.Iterator$class.foreach(Iterator.scala:727)
在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
在 org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
在 org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)</p>
<p>
请帮助我实现这一目标的最佳方法。
听起来你有两个 RDD,让我们称它们为 A
和 B
,它们需要连接,但是 ID 需要一些修改才能这样做。假设这是正确的...
// The data to be processed. How you load it and
// what it looks like is not important.
case class Item (id : Int)
val A = sc.parallelize(Seq(Item(1), Item(2)))
val B = sc.parallelize(Seq(Item(10), Item(20)))
// We then map it to `key, value`, to keep things simple
// A.id should be id * 100 and B.id should be id * 10
val aWithKey = A.map(x => (x.id * 100, x))
val bWithKey = B.map(x => (x.id * 10, x))
// We can now join the two data sets.
aWithKey.join(bWithKey).collect
在我最近的 BigData 项目中,我需要使用 Spark。
第一个要求如下
我们有两组来自不同数据源的数据,比如一组来自 flatFile,另一组来自 HDFS。
数据集可能有也可能没有公共列,但我们手头有映射规则,例如
函数 1(data1.columnA)==函数 2(data2.columnB)
我试图通过在另一个内部的 rdd 上执行 foreach 来实现这一点,但这在 Spark 中是不允许的,
</p>
<p>org.apache.spark.SparkException:RDD转换和动作只能被驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x) 是无效的,因为无法在 rdd1.map 转换内部执行值转换和计数操作。有关详细信息,请参阅 SPARK-5063。
在 org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
在 org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
在 org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332)
在 org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46)
在 com.pramod.engine.DataMatchingEngine.lambda$execute$4e658232$1(DataMatchingEngine.java:44)
在 com.pramod.engine.DataMatchingEngine$$Lambda$9/1172080526.call(未知来源)
在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332)
在 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332)
在 scala.collection.Iterator$class.foreach(Iterator.scala:727)
在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
在 org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
在 org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)</p>
<p>
请帮助我实现这一目标的最佳方法。
听起来你有两个 RDD,让我们称它们为 A
和 B
,它们需要连接,但是 ID 需要一些修改才能这样做。假设这是正确的...
// The data to be processed. How you load it and
// what it looks like is not important.
case class Item (id : Int)
val A = sc.parallelize(Seq(Item(1), Item(2)))
val B = sc.parallelize(Seq(Item(10), Item(20)))
// We then map it to `key, value`, to keep things simple
// A.id should be id * 100 and B.id should be id * 10
val aWithKey = A.map(x => (x.id * 100, x))
val bWithKey = B.map(x => (x.id * 10, x))
// We can now join the two data sets.
aWithKey.join(bWithKey).collect