在独立的 spark 上合并镶木地板文件

Merge parquet file on standalone spark

有没有简单的方法如何将 DataFrame 保存到 单个 parquet 文件或合并包含元数据和此 [=11] 的部分内容的目录=] 将 sqlContext.saveAsParquetFile() 生成的文件转换为存储在 NFS 上的 单个 文件,而不使用 HDFS 和 hadoop?

要只保存一个文件,而不是多个文件,您可以在保存数据之前在 RDD/Dataframe 上调用 coalesce(1) / repartition(1)

如果您已经有一个包含小文件的目录,您可以创建一个 Compacter 进程来读取现有文件并将它们保存到一个新文件中。例如

val rows = parquetFile(...).coalesce(1)
rows.saveAsParquetFile(...)

您可以使用 saveAsParquetFile 存储到本地文件系统。例如

rows.saveAsParquetFile("/tmp/onefile/")

我能够使用这种方法在 Spark 1.6.1 中使用 snappy 格式压缩 parquet 文件。我使用了覆盖,以便在需要时可以重复该过程。这是代码。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode

object CompressApp {
  val serverPort = "hdfs://myserver:8020/"
  val inputUri = serverPort + "input"
  val outputUri = serverPort + "output"

  val config = new SparkConf()
           .setAppName("compress-app")
           .setMaster("local[*]")
  val sc = SparkContext.getOrCreate(config)
  val sqlContext = SQLContext.getOrCreate(sc)
  sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")  
  import sqlContext.implicits._

  def main(args: Array[String]) {
    println("Compressing Parquet...")
    val df = sqlContext.read.parquet(inputUri).coalesce(1)
    df.write.mode(SaveMode.Overwrite).parquet(outputUri)
    println("Done.")
  }
}
到目前为止,

coalesce(N) 救了我。
如果您的 table 已分区,则也使用 repartition("partition key")