Spark 数据帧 UPSERT 到 Postgres Table

Spark Dataframes UPSERT to Postgres Table

我正在使用 Apache Spark DataFrames 连接两个数据源并将结果作为另一个 DataFrame。我想将结果写入另一个 Postgres table。我看到这个选项:

myDataFrame.write.jdbc(url, table, connectionProperties)

但是,我想做的是根据 Table 的主键将数据帧 UPSERT 到 table。如何做到这一点?我正在使用 Spark 1.6.0.

不支持。 DataFrameWriter 可以追加或覆盖现有的 table。如果您的应用程序需要更复杂的逻辑,您将不得不手动处理。

一种选择是使用具有标准 JDBC 连接的操作(foreachforeachPartition)。另一种是写入临时文件并直接在数据库中处理其余部分。

另见 SPARK-19335Spark 应支持通过 JDBC 执行高效的 DataFrame 更新插入)和相关提案。

如果您打算通过 zero323 提到的选项 1 手动完成,您应该看看 Spark source code for the insert statement here

  def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
    val columns = rddSchema.fields.map(_.name).mkString(",")
    val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
    val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
    conn.prepareStatement(sql)
  }

PreparedStatementpart of java.sql 并且它有像 execute()executeUpdate() 这样的方法。当然,您仍然需要相应地修改 sql

要插入 JDBC 你可以使用

dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)

此外,Dataframe.write 为您提供了一个 DataFrameWriter,它具有一些插入数据框的方法。

def insertInto(tableName: String): Unit

将DataFrame的内容插入指定的table。它要求 DataFrame 的架构与 table.

的架构相同

因为它将数据插入到现有的 table,格式或选项将被忽略。

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

虽然还没有从 spark 中开箱即用地更新个人记录

KrisP 拥有它的权利。执行 upsert 的最佳方法不是通过准备好的语句。重要的是要注意,此方法将一次插入一个,分区数量与您拥有的工作人员数量一样多。如果您想批量执行此操作,您也可以

import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
  val dbc: Connection = DriverManager.getConnection("JDBCURL")
  val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")

  batch.grouped("# Of Rows you want per batch").foreach { session =>
    session.foreach { x =>
      st.setDouble(1, x.getDouble(1)) 
      st.addBatch()
    }
    st.executeBatch()
  }
  dbc.close()
}

这将为每个工作人员执行批处理并关闭数据库连接。它使您可以控制多少工人、多少批次并允许您在这些范围内工作。