spark-rdbms:覆盖模式的工作方式不同于追加
spark-rdbms : Overwrite mode is working different from Append
我正在使用 Spark 3.0.0 预览版并尝试将数据集保存到 PostgreSQL 数据库。以下是我正在执行的步骤:
- 从tableA
获取数据
- 从table B 获取数据(与A 结构相同)
- 做左反连接b/w Table A和B。这样做是为了从Table B中获取不在Table A[=中的行23=]
- 将 Table A 与步骤 3 的结果相结合。这样做是为了从 table A 和 B 中获取唯一的行。
- 用Override模式保存结果到Table B
实际:只有来自 table_a 的行在数据库中得到更新。
预期:table a 和步骤 3 的记录联合应该在数据库中得到更新。
分析:如果我将模式用作 'Append',则记录数是正确的,但我正在寻找截断 table 而不是追加。
代码:
val spark = SparkSession.builder.master("local[*]").appName("Testing")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val tableA = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from (select uid, employer_key, name , row_number() over(partition by employer_key order by updated_at desc) as rn from test.table_a) t where t.rn = 1")
.load()
val tableB = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load()
val nonUpdatedDFRows = tableB.join(tableA, tableB("employer_key") === tableA("employer_key"), "leftanti")
nonUpdatedDFRows.show(5) //Working correctly
val refreshDF = nonUpdatedDFRows.unionByName(tableA)
refreshDF.show(5) //Working correctly
refreshDF.write.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("dbtable", "test.table_b")
.option("truncate", "true").mode("overwrite")
.save();
//only rows from table_a get updated in the DB but if I change the mode to Append, it will work fine.
我的代码中的问题是我试图覆盖我正在阅读的同一个 table。
为了解决这个问题,我必须首先缓存值,如下所示:
val tableB = spark.read.format("jdbc").option("url",
"jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load().cahce()
做一些强制spark加载数据的操作,如下:
tableB.show(2)
我正在使用 Spark 3.0.0 预览版并尝试将数据集保存到 PostgreSQL 数据库。以下是我正在执行的步骤:
- 从tableA 获取数据
- 从table B 获取数据(与A 结构相同)
- 做左反连接b/w Table A和B。这样做是为了从Table B中获取不在Table A[=中的行23=]
- 将 Table A 与步骤 3 的结果相结合。这样做是为了从 table A 和 B 中获取唯一的行。
- 用Override模式保存结果到Table B
实际:只有来自 table_a 的行在数据库中得到更新。 预期:table a 和步骤 3 的记录联合应该在数据库中得到更新。 分析:如果我将模式用作 'Append',则记录数是正确的,但我正在寻找截断 table 而不是追加。
代码:
val spark = SparkSession.builder.master("local[*]").appName("Testing")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val tableA = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from (select uid, employer_key, name , row_number() over(partition by employer_key order by updated_at desc) as rn from test.table_a) t where t.rn = 1")
.load()
val tableB = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load()
val nonUpdatedDFRows = tableB.join(tableA, tableB("employer_key") === tableA("employer_key"), "leftanti")
nonUpdatedDFRows.show(5) //Working correctly
val refreshDF = nonUpdatedDFRows.unionByName(tableA)
refreshDF.show(5) //Working correctly
refreshDF.write.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("dbtable", "test.table_b")
.option("truncate", "true").mode("overwrite")
.save();
//only rows from table_a get updated in the DB but if I change the mode to Append, it will work fine.
我的代码中的问题是我试图覆盖我正在阅读的同一个 table。
为了解决这个问题,我必须首先缓存值,如下所示:
val tableB = spark.read.format("jdbc").option("url",
"jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load().cahce()
做一些强制spark加载数据的操作,如下:
tableB.show(2)