通过内部数组的索引有效地连接数组 RDD 的数组
Efficiently concatenate array of arrays RDD by index of inner array
我正在使用 Databricks 运行 使用 Scala (v2.12) 的 Spark 集群 (v3.0.1)。我将 Scala 文件编译为 JAR,我正在 运行 使用来自 Databricks UI.
的 spark-submit
完成作业
程序的逻辑从创建随机种子列表并使用以下行对其进行并行化开始:
val myListRdd = sc.parallelize(myList, partitions)
接下来,我希望在这个 RDD 上 运行 一个处理函数 f(...args)
,args
之一是 myListRdd
的各个元素。该函数的 return 类型为 Array[Array[Double]]
。所以在 Scala 中它看起来像:
val result = myListRdd.map(f(_, ...<more-args>))
现在,我希望使用以下逻辑有效地收集输出的数组数组。
来自 f(...args)
的示例输出:
Output 1: ((1.0, 1.1, 1.2), (1.3, 1.4, 1.5), ...)
Output 2: ((2.0, 2.1, 2.2), (2.3, 2.4, 2.5), ...)
Output 3: ((3.0, 3.1, 3.2), (3.3, 3.4, 3.5), ...)
... so on
现在,由于这些是 f(..args)
的多个输出,我希望使用一些 spark RDD 操作的最终输出看起来像:
Type: Array[Array[Double]]
Value: ((1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2, ...), (1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5, ...), ...)
我是 Spark 和 Scala 的新手,所以我无法将我的逻辑映射到代码。我试图在上面的代码片段中使用 flatMap
而不是 map
,但它并没有给我我想要的输出。如果我尝试使用 collect
操作将输出 RDD 转换为数据帧,那么执行作业会花费很多时间,而且我仍然需要 运行 在数据帧上连接函数。
您可以尝试减少函数的输出以合并数组:
myListRdd.map(f(_, ...<more-args>))
.reduce((x, y) => (0 to x.size - 1).toArray.map(i => x(i) ++ y(i)))
如果您有一个类型的多个实例(在本例中为 Array[Array[Double]]
)并且您需要将它们组合成该类型的单个实例,那么您可能想要 fold()
(或者可能reduce()
).
List(output1,output2,output3)
.foldLeft(Array.empty[Array[Double]]){
case (acc, aad) => aad.indices.map{idx =>
acc.lift(idx).fold(aad(idx))(_ ++ aad(idx))
}.toArray
}
//res0: Array[Array[Double]] =
// Array(Array(1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2)
// , Array(1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5))
我正在使用 Databricks 运行 使用 Scala (v2.12) 的 Spark 集群 (v3.0.1)。我将 Scala 文件编译为 JAR,我正在 运行 使用来自 Databricks UI.
的spark-submit
完成作业
程序的逻辑从创建随机种子列表并使用以下行对其进行并行化开始:
val myListRdd = sc.parallelize(myList, partitions)
接下来,我希望在这个 RDD 上 运行 一个处理函数 f(...args)
,args
之一是 myListRdd
的各个元素。该函数的 return 类型为 Array[Array[Double]]
。所以在 Scala 中它看起来像:
val result = myListRdd.map(f(_, ...<more-args>))
现在,我希望使用以下逻辑有效地收集输出的数组数组。
来自 f(...args)
的示例输出:
Output 1: ((1.0, 1.1, 1.2), (1.3, 1.4, 1.5), ...)
Output 2: ((2.0, 2.1, 2.2), (2.3, 2.4, 2.5), ...)
Output 3: ((3.0, 3.1, 3.2), (3.3, 3.4, 3.5), ...)
... so on
现在,由于这些是 f(..args)
的多个输出,我希望使用一些 spark RDD 操作的最终输出看起来像:
Type: Array[Array[Double]]
Value: ((1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2, ...), (1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5, ...), ...)
我是 Spark 和 Scala 的新手,所以我无法将我的逻辑映射到代码。我试图在上面的代码片段中使用 flatMap
而不是 map
,但它并没有给我我想要的输出。如果我尝试使用 collect
操作将输出 RDD 转换为数据帧,那么执行作业会花费很多时间,而且我仍然需要 运行 在数据帧上连接函数。
您可以尝试减少函数的输出以合并数组:
myListRdd.map(f(_, ...<more-args>))
.reduce((x, y) => (0 to x.size - 1).toArray.map(i => x(i) ++ y(i)))
如果您有一个类型的多个实例(在本例中为 Array[Array[Double]]
)并且您需要将它们组合成该类型的单个实例,那么您可能想要 fold()
(或者可能reduce()
).
List(output1,output2,output3)
.foldLeft(Array.empty[Array[Double]]){
case (acc, aad) => aad.indices.map{idx =>
acc.lift(idx).fold(aad(idx))(_ ++ aad(idx))
}.toArray
}
//res0: Array[Array[Double]] =
// Array(Array(1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2)
// , Array(1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5))