在任务执行期间序列化在 Spark 执行器中创建的临时集合

Serialising temp collections created in Spark executors during task execution

我正在尝试找到一种有效的方法,将在任务内部创建的集合写入作业的输出文件。例如,如果我们使用 foreach 遍历 RDD,我们可以在以下代码片段中创建执行器本地的数据结构,例如 ListBuffer arr。我的问题是如何序列化 arr 并将其写入文件? (1) 我应该使用 FileWriter api 还是 Spark saveAsTextFile 会起作用? (2) 使用一个比另一个有什么优势 (3) 有没有更好的实现方式。

PS:我使用 foreach 而不是 map 的原因是因为我可能无法转换我所有的 RDD 行并且我想避免在输出。

val dataSorted: RDD[(Int, Int)] = <Some Operation>
val arr: ListBuffer = ListBuffer[(String, String)]()
dataSorted.foreach {
     case (e, r) => {
     if(e.id > 1000) {
       arr += (("a", "b"))
     }
  }
}

谢谢, 开发

您不应该使用驱动程序的变量,而应该使用累加器 - 那里有关于它们的文章和代码示例 here and here, also this 问题可能有帮助 - 有自定义的简化代码示例 AccumulatorParam

编写自己的累加器,可以添加 (String, String) 或使用内置 CollectionAccumulator。这是 AccumulatorV2 的实现,它是来自 Spark 2

的累加器的新版本

其他方法是使用 Spark 内置的 filter 和 map 函数 - 感谢@ImDarrenG 推荐 flatMap,但我认为 filter 和 map 会更容易:

val result : Array[(String, String)] = someRDD
    .filter(x => x._1 > 1000) // filter only good rows
    .map (x => ("a", "b"))
    .collect() // convert to arrat

Spark API 为您节省了一些文件处理代码,但本质上实现了相同的目的。

例外情况是,如果您不使用 HDFS 并且不希望对输出文件进行分区(分布在执行程序文件系统中)。在这种情况下,您需要将数据收集到驱动程序并使用 FileWriter 写入单个文件或多个文件,如何实现将取决于您拥有多少数据。如果您的数据多于驱动程序的内存,您将需要以不同的方式处理它。

正如在另一个答案中提到的,您在驱动程序中创建了一个数组,同时从您的执行程序中添加了项目,这在集群环境中是行不通的。这样的事情可能是映射数据和处理空值的更好方法:

val outputRDD = dataSorted.flatMap {
    case (e, r) => {
        if(e.id > 1000) {
            Some(("a", "b"))
        } else {
            None
        }
    }
 }
 // save outputRDD to file/s here using the approapriate method...