通过内部数组的索引有效地连接数组 RDD 的数组

Efficiently concatenate array of arrays RDD by index of inner array

我正在使用 Databricks 运行 使用 Scala (v2.12) 的 Spark 集群 (v3.0.1)。我将 Scala 文件编译为 JAR,我正在 运行 使用来自 Databricks UI.

spark-submit 完成作业

程序的逻辑从创建随机种子列表并使用以下行对其进行并行化开始:

val myListRdd = sc.parallelize(myList, partitions)

接下来,我希望在这个 RDD 上 运行 一个处理函数 f(...args)args 之一是 myListRdd 的各个元素。该函数的 return 类型为 Array[Array[Double]]。所以在 Scala 中它看起来像:

val result = myListRdd.map(f(_, ...<more-args>))

现在,我希望使用以下逻辑有效地收集输出的数组数组。

来自 f(...args) 的示例输出:

Output 1: ((1.0, 1.1, 1.2), (1.3, 1.4, 1.5), ...)
Output 2: ((2.0, 2.1, 2.2), (2.3, 2.4, 2.5), ...)
Output 3: ((3.0, 3.1, 3.2), (3.3, 3.4, 3.5), ...)
... so on

现在,由于这些是 f(..args) 的多个输出,我希望使用一些 spark RDD 操作的最终输出看起来像:

Type: Array[Array[Double]]
Value: ((1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2, ...), (1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5, ...), ...)

我是 Spark 和 Scala 的新手,所以我无法将我的逻辑映射到代码。我试图在上面的代码片段中使用 flatMap 而不是 map,但它并没有给我我想要的输出。如果我尝试使用 collect 操作将输出 RDD 转换为数据帧,那么执行作业会花费很多时间,而且我仍然需要 运行 在数据帧上连接函数。

您可以尝试减少函数的输出以合并数​​组:

myListRdd.map(f(_, ...<more-args>))
         .reduce((x, y) => (0 to x.size - 1).toArray.map(i => x(i) ++ y(i)))

如果您有一个类型的多个实例(在本例中为 Array[Array[Double]])并且您需要将它们组合成该类型的单个实例,那么您可能想要 fold()(或者可能reduce()).

List(output1,output2,output3)
  .foldLeft(Array.empty[Array[Double]]){
    case (acc, aad) => aad.indices.map{idx =>
      acc.lift(idx).fold(aad(idx))(_ ++ aad(idx))
    }.toArray
  }
//res0: Array[Array[Double]] =
// Array(Array(1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2)
//     , Array(1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5))