Spark RDD 与列表的连接操作
Spark RDDs join operation with lists
我有以下 RDD:
JavaPairRDD<List<String>, String> firstRDD = ...
firstRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent], Father
JavaPairRDD<List<String>, String> secondRDD = ...
secondRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent, Father], Person
我想执行内部联接,这样一行等于另一行如果左键是 IN(即,子列表)右键(在前面的例子中,[Man, Parent]
在[Man, Parent, Father]
)。
有什么建议吗?
谢谢!
对于 RDD(以及 JavaPairRDD),join 操作只能检查完全匹配的键。
因此我们必须将 RDD 转换为 Dataframes:
public static Dataset<Row> toDataframe(SparkSession spark, JavaPairRDD<List<String>, String> rdd) {
JavaRDD<Row> rowRDD1 = rdd.map(tuple -> {
Seq<String> key = JavaConverters.asScalaIteratorConverter(tuple._1().iterator()).asScala().toSeq();
return RowFactory.create(key, tuple._2());
});
StructType st = new StructType()
.add(new StructField("key", DataTypes.createArrayType(DataTypes.StringType), true, new MetadataBuilder().build()))
.add(new StructField("value", DataTypes.StringType, true, new MetadataBuilder().build()));
return spark.createDataFrame(rowRDD1, st);
}
对于连接标准,我们需要一个 UDF 来检查一个数组是否是另一个数组的一部分。如果元素的顺序不重要,也可以使用array_intersect。
UserDefinedFunction contains = functions.udf((Seq<String> a, Seq<String> b) -> b.containsSlice(a), DataTypes.BooleanType);
将这两个元素放在一起,我们得到
Dataset<Row> df1 = toDataframe(spark, firstRDD);
Dataset<Row> df2 = toDataframe(spark, secondRDD);
Dataset<Row> result = df1.join(df2,contains.apply(df1.col("key"), df2.col("key")));
有了输入数据
firstRDD secondRDD
+------+-----+ +------------+-----+
| key|value| | key|value|
+------+-----+ +------------+-----+
|[a, b]| A| | [a, b, c]| C|
|[b, a]| B| |[a, b, c, d]| D|
+------+-----+ +------------+-----+
我们得到
+------+-----+------------+-----+
| key|value| key|value|
+------+-----+------------+-----+
|[a, b]| A| [a, b, c]| C|
|[a, b]| A|[a, b, c, d]| D|
+------+-----+------------+-----+
请注意,使用 UDF 作为连接标准可能不是 。
我有以下 RDD:
JavaPairRDD<List<String>, String> firstRDD = ...
firstRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent], Father
JavaPairRDD<List<String>, String> secondRDD = ...
secondRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent, Father], Person
我想执行内部联接,这样一行等于另一行如果左键是 IN(即,子列表)右键(在前面的例子中,[Man, Parent]
在[Man, Parent, Father]
)。
有什么建议吗?
谢谢!
对于 RDD(以及 JavaPairRDD),join 操作只能检查完全匹配的键。
因此我们必须将 RDD 转换为 Dataframes:
public static Dataset<Row> toDataframe(SparkSession spark, JavaPairRDD<List<String>, String> rdd) {
JavaRDD<Row> rowRDD1 = rdd.map(tuple -> {
Seq<String> key = JavaConverters.asScalaIteratorConverter(tuple._1().iterator()).asScala().toSeq();
return RowFactory.create(key, tuple._2());
});
StructType st = new StructType()
.add(new StructField("key", DataTypes.createArrayType(DataTypes.StringType), true, new MetadataBuilder().build()))
.add(new StructField("value", DataTypes.StringType, true, new MetadataBuilder().build()));
return spark.createDataFrame(rowRDD1, st);
}
对于连接标准,我们需要一个 UDF 来检查一个数组是否是另一个数组的一部分。如果元素的顺序不重要,也可以使用array_intersect。
UserDefinedFunction contains = functions.udf((Seq<String> a, Seq<String> b) -> b.containsSlice(a), DataTypes.BooleanType);
将这两个元素放在一起,我们得到
Dataset<Row> df1 = toDataframe(spark, firstRDD);
Dataset<Row> df2 = toDataframe(spark, secondRDD);
Dataset<Row> result = df1.join(df2,contains.apply(df1.col("key"), df2.col("key")));
有了输入数据
firstRDD secondRDD
+------+-----+ +------------+-----+
| key|value| | key|value|
+------+-----+ +------------+-----+
|[a, b]| A| | [a, b, c]| C|
|[b, a]| B| |[a, b, c, d]| D|
+------+-----+ +------------+-----+
我们得到
+------+-----+------------+-----+
| key|value| key|value|
+------+-----+------------+-----+
|[a, b]| A| [a, b, c]| C|
|[a, b]| A|[a, b, c, d]| D|
+------+-----+------------+-----+
请注意,使用 UDF 作为连接标准可能不是