Spark 数据帧 UPSERT 到 Postgres Table
Spark Dataframes UPSERT to Postgres Table
我正在使用 Apache Spark DataFrames 连接两个数据源并将结果作为另一个 DataFrame。我想将结果写入另一个 Postgres table。我看到这个选项:
myDataFrame.write.jdbc(url, table, connectionProperties)
但是,我想做的是根据 Table 的主键将数据帧 UPSERT 到 table。如何做到这一点?我正在使用 Spark 1.6.0.
不支持。 DataFrameWriter
可以追加或覆盖现有的 table。如果您的应用程序需要更复杂的逻辑,您将不得不手动处理。
一种选择是使用具有标准 JDBC 连接的操作(foreach
、foreachPartition
)。另一种是写入临时文件并直接在数据库中处理其余部分。
另见 SPARK-19335(Spark 应支持通过 JDBC 执行高效的 DataFrame 更新插入)和相关提案。
如果您打算通过 zero323 提到的选项 1 手动完成,您应该看看 Spark source code for the insert statement here
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
}
PreparedStatement
是 part of java.sql
并且它有像 execute()
和 executeUpdate()
这样的方法。当然,您仍然需要相应地修改 sql
。
要插入 JDBC 你可以使用
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
此外,Dataframe.write 为您提供了一个 DataFrameWriter,它具有一些插入数据框的方法。
def insertInto(tableName: String): Unit
将DataFrame的内容插入指定的table。它要求 DataFrame 的架构与 table.
的架构相同
因为它将数据插入到现有的 table,格式或选项将被忽略。
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
虽然还没有从 spark 中开箱即用地更新个人记录
KrisP 拥有它的权利。执行 upsert 的最佳方法不是通过准备好的语句。重要的是要注意,此方法将一次插入一个,分区数量与您拥有的工作人员数量一样多。如果您想批量执行此操作,您也可以
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach { session =>
session.foreach { x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
}
st.executeBatch()
}
dbc.close()
}
这将为每个工作人员执行批处理并关闭数据库连接。它使您可以控制多少工人、多少批次并允许您在这些范围内工作。
我正在使用 Apache Spark DataFrames 连接两个数据源并将结果作为另一个 DataFrame。我想将结果写入另一个 Postgres table。我看到这个选项:
myDataFrame.write.jdbc(url, table, connectionProperties)
但是,我想做的是根据 Table 的主键将数据帧 UPSERT 到 table。如何做到这一点?我正在使用 Spark 1.6.0.
不支持。 DataFrameWriter
可以追加或覆盖现有的 table。如果您的应用程序需要更复杂的逻辑,您将不得不手动处理。
一种选择是使用具有标准 JDBC 连接的操作(foreach
、foreachPartition
)。另一种是写入临时文件并直接在数据库中处理其余部分。
另见 SPARK-19335(Spark 应支持通过 JDBC 执行高效的 DataFrame 更新插入)和相关提案。
如果您打算通过 zero323 提到的选项 1 手动完成,您应该看看 Spark source code for the insert statement here
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
}
PreparedStatement
是 part of java.sql
并且它有像 execute()
和 executeUpdate()
这样的方法。当然,您仍然需要相应地修改 sql
。
要插入 JDBC 你可以使用
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
此外,Dataframe.write 为您提供了一个 DataFrameWriter,它具有一些插入数据框的方法。
def insertInto(tableName: String): Unit
将DataFrame的内容插入指定的table。它要求 DataFrame 的架构与 table.
的架构相同因为它将数据插入到现有的 table,格式或选项将被忽略。
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
虽然还没有从 spark 中开箱即用地更新个人记录
KrisP 拥有它的权利。执行 upsert 的最佳方法不是通过准备好的语句。重要的是要注意,此方法将一次插入一个,分区数量与您拥有的工作人员数量一样多。如果您想批量执行此操作,您也可以
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach { session =>
session.foreach { x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
}
st.executeBatch()
}
dbc.close()
}
这将为每个工作人员执行批处理并关闭数据库连接。它使您可以控制多少工人、多少批次并允许您在这些范围内工作。