spark-rdbms:覆盖模式的工作方式不同于追加

spark-rdbms : Overwrite mode is working different from Append

我正在使用 Spark 3.0.0 预览版并尝试将数据集保存到 PostgreSQL 数据库。以下是我正在执行的步骤:

  1. 从tableA
  2. 获取数据
  3. 从table B 获取数据(与A 结构相同)
  4. 做左反连接b/w Table A和B。这样做是为了从Table B中获取不在Table A[=中的行23=]
  5. 将 Table A 与步骤 3 的结果相结合。这样做是为了从 table A 和 B 中获取唯一的行。
  6. 用Override模式保存结果到Table B

实际:只有来自 table_a 的行在数据库中得到更新。 预期:table a 和步骤 3 的记录联合应该在数据库中得到更新。 分析:如果我将模式用作 'Append',则记录数是正确的,但我正在寻找截断 table 而不是追加。

代码:

val spark = SparkSession.builder.master("local[*]").appName("Testing")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

val tableA = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from (select uid, employer_key, name , row_number() over(partition by employer_key order by updated_at desc) as rn from test.table_a) t where t.rn = 1")
.load()

 val tableB = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load()

val nonUpdatedDFRows = tableB.join(tableA, tableB("employer_key") === tableA("employer_key"), "leftanti")

nonUpdatedDFRows.show(5) //Working correctly

val refreshDF = nonUpdatedDFRows.unionByName(tableA)

refreshDF.show(5) //Working correctly

refreshDF.write.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("dbtable", "test.table_b")
.option("truncate", "true").mode("overwrite")
.save();

//only rows from table_a get updated in the DB but if I change the mode to Append, it will work fine.

我的代码中的问题是我试图覆盖我正在阅读的同一个 table。

为了解决这个问题,我必须首先缓存值,如下所示:

val tableB = spark.read.format("jdbc").option("url", 
"jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load().cahce()

做一些强制spark加载数据的操作,如下:

tableB.show(2)