覆盖 Hive 分区数据时的 Spark Dataframe 问题 table
Spark Dataframe issue in overwriting the partition data of Hive table
下面是我的 Hive table 定义:
CREATE EXTERNAL TABLE IF NOT EXISTS default.test2(
id integer,
count integer
)
PARTITIONED BY (
fac STRING,
fiscaldate_str DATE )
STORED AS PARQUET
LOCATION 's3://<bucket name>/backup/test2';
我在配置单元 table 中有如下数据,(我刚刚插入了示例数据)
select * from default.test2
+---+-----+----+--------------+
| id|count| fac|fiscaldate_str|
+---+-----+----+--------------+
| 2| 3| NRM| 2019-01-01|
| 1| 2| NRM| 2019-01-01|
| 2| 3| NRM| 2019-01-02|
| 1| 2| NRM| 2019-01-02|
| 2| 3| NRM| 2019-01-03|
| 1| 2| NRM| 2019-01-03|
| 2| 3|STST| 2019-01-01|
| 1| 2|STST| 2019-01-01|
| 2| 3|STST| 2019-01-02|
| 1| 2|STST| 2019-01-02|
| 2| 3|STST| 2019-01-03|
| 1| 2|STST| 2019-01-03|
+---+-----+----+--------------+
这个 table 在两列上分区(fac,fiscaldate_str),我们正在尝试使用 spark 数据帧 - 数据帧编写器在分区级别动态执行插入覆盖。
但是,尝试此操作时,我们要么以重复数据结束,要么所有其他分区都被删除。
下面是使用 spark 数据框的代码片段。
首先,我将数据框创建为
df = spark.createDataFrame([(99,99,'NRM','2019-01-01'),(999,999,'NRM','2019-01-01')], ['id','count','fac','fiscaldate_str'])
df.show(2,False)
+---+-----+---+--------------+
|id |count|fac|fiscaldate_str|
+---+-----+---+--------------+
|99 |99 |NRM|2019-01-01 |
|999|999 |NRM|2019-01-01 |
+---+-----+---+--------------+
使用以下代码段复制,
df.coalesce(1).write.mode("overwrite").insertInto("default.test2")
删除所有其他数据,只有新数据可用。
df.coalesce(1).write.mode("overwrite").saveAsTable("default.test2")
或
df.createOrReplaceTempView("tempview")
tbl_ald_kpiv_hist_insert = spark.sql("""
INSERT OVERWRITE TABLE default.test2
partition(fac,fiscaldate_str)
select * from tempview
""")
我正在将 AWS EMR 与 Spark 2.4.0 和 Hive 2.3.4-amzn-1 以及 S3 一起使用。
谁能知道为什么我无法将数据动态覆盖到分区中?
你的问题不太容易理解,但我认为你的意思是你想覆盖一个分区。如果是这样,那么这就是你所需要的,你所需要的——第二行:
df = spark.createDataFrame([(99,99,'AAA','2019-01-02'),(999,999,'BBB','2019-01-01')], ['id','count','fac','fiscaldate_str'])
df.coalesce(1).write.mode("overwrite").insertInto("test2",overwrite=True)
注意 overwrite=True。由于正在使用 DF.writer,因此所做的评论既不在这里也不在那里。我不是在解决 coalesce(1).
对提问者的评论
我 运行 这是我的标准做法 - 在此处进行原型设计和回答时 - 在 Databricks Notebook 上明确设置以下内容并且工作正常:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","static")
spark.conf.set("hive.exec.dynamic.partition.mode", "strict")
您要求更新答案:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic").
可以像我刚才那样做;在您的环境中可能需要这样做,但我确实不需要这样做。
20 年 3 月 19 日更新
这适用于之前的 Spark 版本,现在适用于以下应用程序:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// In Databricks did not matter the below settings
//spark.conf.set("hive.exec.dynamic.partition", "true")
//spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
Seq(("CompanyA1", "A"), ("CompanyA2", "A"),
("CompanyB1", "B"))
.toDF("company", "id")
.write
.mode(SaveMode.Overwrite)
.partitionBy("id")
.saveAsTable("KQCAMS9")
spark.sql(s"SELECT * FROM KQCAMS9").show(false)
val df = Seq(("CompanyA3", "A"))
.toDF("company", "id")
// disregard coalsece
df.coalesce(1).write.mode("overwrite").insertInto("KQCAMS9")
spark.sql(s"SELECT * FROM KQCAMS9").show(false)
spark.sql(s"show partitions KQCAMS9").show(false)
从 2.4.x 开始就这样了。以后。
下面是我的 Hive table 定义:
CREATE EXTERNAL TABLE IF NOT EXISTS default.test2(
id integer,
count integer
)
PARTITIONED BY (
fac STRING,
fiscaldate_str DATE )
STORED AS PARQUET
LOCATION 's3://<bucket name>/backup/test2';
我在配置单元 table 中有如下数据,(我刚刚插入了示例数据)
select * from default.test2
+---+-----+----+--------------+
| id|count| fac|fiscaldate_str|
+---+-----+----+--------------+
| 2| 3| NRM| 2019-01-01|
| 1| 2| NRM| 2019-01-01|
| 2| 3| NRM| 2019-01-02|
| 1| 2| NRM| 2019-01-02|
| 2| 3| NRM| 2019-01-03|
| 1| 2| NRM| 2019-01-03|
| 2| 3|STST| 2019-01-01|
| 1| 2|STST| 2019-01-01|
| 2| 3|STST| 2019-01-02|
| 1| 2|STST| 2019-01-02|
| 2| 3|STST| 2019-01-03|
| 1| 2|STST| 2019-01-03|
+---+-----+----+--------------+
这个 table 在两列上分区(fac,fiscaldate_str),我们正在尝试使用 spark 数据帧 - 数据帧编写器在分区级别动态执行插入覆盖。
但是,尝试此操作时,我们要么以重复数据结束,要么所有其他分区都被删除。
下面是使用 spark 数据框的代码片段。
首先,我将数据框创建为
df = spark.createDataFrame([(99,99,'NRM','2019-01-01'),(999,999,'NRM','2019-01-01')], ['id','count','fac','fiscaldate_str'])
df.show(2,False)
+---+-----+---+--------------+
|id |count|fac|fiscaldate_str|
+---+-----+---+--------------+
|99 |99 |NRM|2019-01-01 |
|999|999 |NRM|2019-01-01 |
+---+-----+---+--------------+
使用以下代码段复制,
df.coalesce(1).write.mode("overwrite").insertInto("default.test2")
删除所有其他数据,只有新数据可用。
df.coalesce(1).write.mode("overwrite").saveAsTable("default.test2")
或
df.createOrReplaceTempView("tempview")
tbl_ald_kpiv_hist_insert = spark.sql("""
INSERT OVERWRITE TABLE default.test2
partition(fac,fiscaldate_str)
select * from tempview
""")
我正在将 AWS EMR 与 Spark 2.4.0 和 Hive 2.3.4-amzn-1 以及 S3 一起使用。
谁能知道为什么我无法将数据动态覆盖到分区中?
你的问题不太容易理解,但我认为你的意思是你想覆盖一个分区。如果是这样,那么这就是你所需要的,你所需要的——第二行:
df = spark.createDataFrame([(99,99,'AAA','2019-01-02'),(999,999,'BBB','2019-01-01')], ['id','count','fac','fiscaldate_str'])
df.coalesce(1).write.mode("overwrite").insertInto("test2",overwrite=True)
注意 overwrite=True。由于正在使用 DF.writer,因此所做的评论既不在这里也不在那里。我不是在解决 coalesce(1).
对提问者的评论
我 运行 这是我的标准做法 - 在此处进行原型设计和回答时 - 在 Databricks Notebook 上明确设置以下内容并且工作正常:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","static")
spark.conf.set("hive.exec.dynamic.partition.mode", "strict")
您要求更新答案:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic").
可以像我刚才那样做;在您的环境中可能需要这样做,但我确实不需要这样做。
20 年 3 月 19 日更新
这适用于之前的 Spark 版本,现在适用于以下应用程序:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// In Databricks did not matter the below settings
//spark.conf.set("hive.exec.dynamic.partition", "true")
//spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
Seq(("CompanyA1", "A"), ("CompanyA2", "A"),
("CompanyB1", "B"))
.toDF("company", "id")
.write
.mode(SaveMode.Overwrite)
.partitionBy("id")
.saveAsTable("KQCAMS9")
spark.sql(s"SELECT * FROM KQCAMS9").show(false)
val df = Seq(("CompanyA3", "A"))
.toDF("company", "id")
// disregard coalsece
df.coalesce(1).write.mode("overwrite").insertInto("KQCAMS9")
spark.sql(s"SELECT * FROM KQCAMS9").show(false)
spark.sql(s"show partitions KQCAMS9").show(false)
从 2.4.x 开始就这样了。以后。