Spark:根据另一个RDD中数组的元素获取一个RDD的元素
Spark: Get elements of an RDD based on the elements of an array in another RDD
在 Spark Scala 框架中,我有一个 RDD rdd1
,其中每个元素代表矩阵的单个元素 A
:
val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}
x
代表行,
y
表示列和
v
表示矩阵A
.
中的值
我还有另一个RDD,rdd2
,形式为RDD[index, Array[(x, y)]]
,其中每个元素中的数组表示矩阵A
的元素集,存储在rdd1
,该元素中表示的特定 index
需要。
现在我需要做的是为每个 index
获取矩阵 A
元素的值,保留所有数据,包括 index
、(x,y)
和 v
。这样做的好方法是什么?
如果我没理解错的话,你的问题可以归结为:
val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
((0, 0), 5.5),
((1, 0), 7.7)
))
val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
(123, Array((0, 0), (1, 0)))
))
并且您想合并这些 RDD 以获得所有值 (index, (x, y), v)
,在这种情况下,(123, (0,0), 5.5)
和 (123, (1,0), 7.7)
?
你绝对可以使用 join
来做到这一点,因为两个 RDD 都有一个公共列 (x, y)
,但是因为其中一个实际上有一个 Array[(x, y)]
你必须分解它首先进入一组行:
val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))
val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)
val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)
// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)
在 Spark Scala 框架中,我有一个 RDD rdd1
,其中每个元素代表矩阵的单个元素 A
:
val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}
x
代表行,
y
表示列和
v
表示矩阵A
.
我还有另一个RDD,rdd2
,形式为RDD[index, Array[(x, y)]]
,其中每个元素中的数组表示矩阵A
的元素集,存储在rdd1
,该元素中表示的特定 index
需要。
现在我需要做的是为每个 index
获取矩阵 A
元素的值,保留所有数据,包括 index
、(x,y)
和 v
。这样做的好方法是什么?
如果我没理解错的话,你的问题可以归结为:
val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
((0, 0), 5.5),
((1, 0), 7.7)
))
val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
(123, Array((0, 0), (1, 0)))
))
并且您想合并这些 RDD 以获得所有值 (index, (x, y), v)
,在这种情况下,(123, (0,0), 5.5)
和 (123, (1,0), 7.7)
?
你绝对可以使用 join
来做到这一点,因为两个 RDD 都有一个公共列 (x, y)
,但是因为其中一个实际上有一个 Array[(x, y)]
你必须分解它首先进入一组行:
val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))
val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)
val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)
// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)