如何将流数据帧写入 PostgreSQL?
How to write streaming dataframe to PostgreSQL?
我有一个正在尝试写入数据库的流式数据帧。有将 rdd 或 df 写入 Postgres 的文档。但是,我无法找到有关如何在结构化流式传输中完成的示例或文档。
我已经阅读了文档 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch ,但我不明白在哪里创建 jdbc 连接以及如何将它写入数据库。
def foreach_batch_function(df, epoch_id):
# what goes in here?
pass
view_counts_query = windowed_view_counts.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function)
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start() \
.awaitTermination()
此函数接收常规数据帧并写入 postgres table
def postgres_sink(config, data_frame):
config.read('/src/config/config.ini')
dbname = config.get('dbauth', 'dbname')
dbuser = config.get('dbauth', 'user')
dbpass = config.get('dbauth', 'password')
dbhost = config.get('dbauth', 'host')
dbport = config.get('dbauth', 'port')
url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
properties = {
"driver": "org.postgresql.Driver",
"user": dbuser,
"password": dbpass
}
data_frame.write.jdbc(url=url, table="metrics", mode="append",
properties=properties)
除了你已经拥有的,这里真的没什么可做的。 foreachBatch
takes a function (DataFrame, Int) => None
,所以你只需要一个小的适配器,其他一切应该都可以正常工作:
def foreach_batch_for_config(config)
def _(df, epoch_id):
postgres_sink(config, df)
return _
view_counts_query = (windowed_view_counts
.writeStream
.outputMode("append")
.foreachBatch(foreach_batch_for_config(some_config))
...,
.start()
.awaitTermination())
虽然老实说,传递 ConfigParser
从一开始就是一个奇怪的想法。您可以调整签名并就地初始化它
def postgres_sink(data_frame, batch_id):
config = configparser.ConfigParser()
...
data_frame.write.jdbc(...)
其余部分保持原样。这样你就可以直接使用你的函数了:
...
.foreachBatch(postgres_sink)
...
如何使用结构化流式传输完成 Postgres 摄取的示例
class PostgreSqlSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
val driver = "org.postgresql.Driver"
var connection: java.sql.Connection = _
var statement: java.sql.PreparedStatement = _
val v_sql = "insert INTO Table (A,B,C) values ( ?, ?, ?)"
def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)
connection = java.sql.DriverManager.getConnection(url, user, pwd)
connection.setAutoCommit(false)
statement = connection.prepareStatement(v_sql)
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
// ignoring value(0) as this is address
statement.setString(1, value(1).toString)
statement.setString(2, value(2).toString)
statement.setString(3, value(3).toString)
statement.executeUpdate()
}
def close(errorOrNull: Throwable): Unit = {
connection.commit()
connection.close
}
}
val url = "jdbc:postgresql://XX.XX.XX.XX:5432/postgres"
val user = "abc"
val pw = "abc@123"
val jdbcWriter = new PostgreSqlSink(url,user,pw)
val writeData = pg_df.writeStream
.foreach(jdbcWriter)
.outputMode("Append")
.trigger(ProcessingTime("30 seconds"))
.option("checkpointLocation", "s3a://bucket/check")
.start()
writeData.awaitTermination
我有一个正在尝试写入数据库的流式数据帧。有将 rdd 或 df 写入 Postgres 的文档。但是,我无法找到有关如何在结构化流式传输中完成的示例或文档。
我已经阅读了文档 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch ,但我不明白在哪里创建 jdbc 连接以及如何将它写入数据库。
def foreach_batch_function(df, epoch_id):
# what goes in here?
pass
view_counts_query = windowed_view_counts.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function)
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start() \
.awaitTermination()
此函数接收常规数据帧并写入 postgres table
def postgres_sink(config, data_frame):
config.read('/src/config/config.ini')
dbname = config.get('dbauth', 'dbname')
dbuser = config.get('dbauth', 'user')
dbpass = config.get('dbauth', 'password')
dbhost = config.get('dbauth', 'host')
dbport = config.get('dbauth', 'port')
url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
properties = {
"driver": "org.postgresql.Driver",
"user": dbuser,
"password": dbpass
}
data_frame.write.jdbc(url=url, table="metrics", mode="append",
properties=properties)
除了你已经拥有的,这里真的没什么可做的。 foreachBatch
takes a function (DataFrame, Int) => None
,所以你只需要一个小的适配器,其他一切应该都可以正常工作:
def foreach_batch_for_config(config)
def _(df, epoch_id):
postgres_sink(config, df)
return _
view_counts_query = (windowed_view_counts
.writeStream
.outputMode("append")
.foreachBatch(foreach_batch_for_config(some_config))
...,
.start()
.awaitTermination())
虽然老实说,传递 ConfigParser
从一开始就是一个奇怪的想法。您可以调整签名并就地初始化它
def postgres_sink(data_frame, batch_id):
config = configparser.ConfigParser()
...
data_frame.write.jdbc(...)
其余部分保持原样。这样你就可以直接使用你的函数了:
...
.foreachBatch(postgres_sink)
...
如何使用结构化流式传输完成 Postgres 摄取的示例
class PostgreSqlSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
val driver = "org.postgresql.Driver"
var connection: java.sql.Connection = _
var statement: java.sql.PreparedStatement = _
val v_sql = "insert INTO Table (A,B,C) values ( ?, ?, ?)"
def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)
connection = java.sql.DriverManager.getConnection(url, user, pwd)
connection.setAutoCommit(false)
statement = connection.prepareStatement(v_sql)
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
// ignoring value(0) as this is address
statement.setString(1, value(1).toString)
statement.setString(2, value(2).toString)
statement.setString(3, value(3).toString)
statement.executeUpdate()
}
def close(errorOrNull: Throwable): Unit = {
connection.commit()
connection.close
}
}
val url = "jdbc:postgresql://XX.XX.XX.XX:5432/postgres"
val user = "abc"
val pw = "abc@123"
val jdbcWriter = new PostgreSqlSink(url,user,pw)
val writeData = pg_df.writeStream
.foreach(jdbcWriter)
.outputMode("Append")
.trigger(ProcessingTime("30 seconds"))
.option("checkpointLocation", "s3a://bucket/check")
.start()
writeData.awaitTermination