持久化 Spark 数据框

Persisting Spark Dataframe

我是 Spark 世界的新手。我们如何持久化 Dataframe 以便我们可以跨组件使用它。

我有一个 Kafka 流,我正在通过 Rdd.Tried RegisterAsTempTable 生成 Dataframe,但是 table 无法在另一个程序中访问。

我想通过sqlContext在另一个class中访问这个Dataframe,并使用查询结果进行进一步的计算。

使用 saveAsTable 命令,

DataFrames 也可以 saved 作为持久性 tables 进入 Hive 元存储。请注意,现有的 Hive 部署并不是使用此功能所必需的。 Spark 将为您创建默认的本地 Hive 元存储(使用 Derby)。与 createOrReplaceTempView 命令不同,saveAsTable 将具体化 DataFrame 的内容并创建指向 Hive 元存储中数据的指针。

即使在您的 Spark 程序重新启动后,持久性 tables 仍然存在,只要您保持与同一元存储的连接。可以通过调用名为 table.

的 SparkSession 上的 table 方法来创建持久性 table 的 DataFrame

默认情况下,saveAsTable 将创建一个“托管table”,这意味着数据的位置将由元存储控制。当删除 table 时,托管 table 的数据也会自动删除。

您可以将DataFrame 的内容保存为Parquet 文件,并在另一个程序中读取该文件。您可以在接下来的program.Spark中注册为Temp table SQL 提供对自动保留原始数据模式的读写Parquet 文件的支持。

//First Program
dataframe.write.format("parquet").save("/tmp/xyz-dir/card.parquet")
//where /tmp/xyz-dir/ is a HDFS directory

//Second Program
val parquetRead = sqlContext.read.format("parquet").load("/tmp/xyz-dir/card.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetRead.registerTempTable("parquettemptable")
val cust= sqlContext.sql("SELECT name FROM parquettemptable")

//After use of parquet file, delete the same in the second program
val fs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://hostname:8030"), sc.hadoopConfiguration)
fs.delete(new org.apache.hadoop.fs.Path("/tmp/xyz-dir"),true) // isRecusrive= true