在独立的 spark 上合并镶木地板文件
Merge parquet file on standalone spark
有没有简单的方法如何将 DataFrame
保存到 单个 parquet
文件或合并包含元数据和此 [=11] 的部分内容的目录=] 将 sqlContext.saveAsParquetFile()
生成的文件转换为存储在 NFS 上的 单个 文件,而不使用 HDFS 和 hadoop?
要只保存一个文件,而不是多个文件,您可以在保存数据之前在 RDD/Dataframe 上调用 coalesce(1)
/ repartition(1)
。
如果您已经有一个包含小文件的目录,您可以创建一个 Compacter 进程来读取现有文件并将它们保存到一个新文件中。例如
val rows = parquetFile(...).coalesce(1)
rows.saveAsParquetFile(...)
您可以使用 saveAsParquetFile 存储到本地文件系统。例如
rows.saveAsParquetFile("/tmp/onefile/")
我能够使用这种方法在 Spark 1.6.1 中使用 snappy 格式压缩 parquet 文件。我使用了覆盖,以便在需要时可以重复该过程。这是代码。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
object CompressApp {
val serverPort = "hdfs://myserver:8020/"
val inputUri = serverPort + "input"
val outputUri = serverPort + "output"
val config = new SparkConf()
.setAppName("compress-app")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(config)
val sqlContext = SQLContext.getOrCreate(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
import sqlContext.implicits._
def main(args: Array[String]) {
println("Compressing Parquet...")
val df = sqlContext.read.parquet(inputUri).coalesce(1)
df.write.mode(SaveMode.Overwrite).parquet(outputUri)
println("Done.")
}
}
到目前为止,coalesce(N)
救了我。
如果您的 table 已分区,则也使用 repartition("partition key")
。
有没有简单的方法如何将 DataFrame
保存到 单个 parquet
文件或合并包含元数据和此 [=11] 的部分内容的目录=] 将 sqlContext.saveAsParquetFile()
生成的文件转换为存储在 NFS 上的 单个 文件,而不使用 HDFS 和 hadoop?
要只保存一个文件,而不是多个文件,您可以在保存数据之前在 RDD/Dataframe 上调用 coalesce(1)
/ repartition(1)
。
如果您已经有一个包含小文件的目录,您可以创建一个 Compacter 进程来读取现有文件并将它们保存到一个新文件中。例如
val rows = parquetFile(...).coalesce(1)
rows.saveAsParquetFile(...)
您可以使用 saveAsParquetFile 存储到本地文件系统。例如
rows.saveAsParquetFile("/tmp/onefile/")
我能够使用这种方法在 Spark 1.6.1 中使用 snappy 格式压缩 parquet 文件。我使用了覆盖,以便在需要时可以重复该过程。这是代码。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
object CompressApp {
val serverPort = "hdfs://myserver:8020/"
val inputUri = serverPort + "input"
val outputUri = serverPort + "output"
val config = new SparkConf()
.setAppName("compress-app")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(config)
val sqlContext = SQLContext.getOrCreate(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
import sqlContext.implicits._
def main(args: Array[String]) {
println("Compressing Parquet...")
val df = sqlContext.read.parquet(inputUri).coalesce(1)
df.write.mode(SaveMode.Overwrite).parquet(outputUri)
println("Done.")
}
}
coalesce(N)
救了我。
如果您的 table 已分区,则也使用 repartition("partition key")
。