在 Apache Spark 中连接到 SQLite
Connect to SQLite in Apache Spark
我想 运行 SQLite 数据库中所有 table 的自定义函数。功能大致相同,但取决于个人的架构table。此外,table 及其模式仅在 运行 时已知(使用指定数据库路径的参数调用程序)。
这是我目前拥有的:
val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// somehow bind sqlContext to DB
val allTables = sqlContext.tableNames
for( t <- allTables) {
val df = sqlContext.table(t)
val schema = df.columns
sqlContext.sql("SELECT * FROM " + t + "...").map(x => myFunc(x,schema))
}
到目前为止我发现的唯一提示需要提前知道table,而在我的场景中并非如此:
val tableData =
sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db", "dbtable" -> t))
.load()
我正在使用 xerial sqlite jdbc 驱动程序。那么我怎样才能只连接到数据库,而不是 table?
编辑:使用 Beryllium 的答案作为开始,我将代码更新为:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val metaData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> "(SELECT * FROM sqlite_master) AS t")).load()
val myTableNames = metaData.select("tbl_name").distinct()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.table(t.toString)
for (record <- tableData.select("*")) {
println(record)
}
}
至少我可以在 运行 时阅读 table 个名字,这对我来说是一个巨大的进步。但是我看不懂 tables。我都试过了
val tableData = sqlContext.table(t.toString)
和
val tableData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> t.toString)).load()
在循环中,但在这两种情况下我都得到 NullPointerException。虽然我可以打印 table 个名称,但我似乎无法连接到它们。
最后但同样重要的是,我总是收到 SQLITE_ERROR: Connection is closed
错误。它看起来与这个问题中描述的问题相同:
您可以尝试两种选择
直接使用JDBC
- 在您的 Spark 作业中打开一个单独的普通 JDBC 连接
- 从 JDBC 元数据中获取 table 的名字
- 将这些融入您的
for
理解中
对 "dbtable" 参数使用 SQL 查询
您可以将查询指定为 dbtable
参数的值。从语法上讲,此查询必须 "look" 类似于 table,因此它必须包含在子查询中。
在该查询中,从数据库中获取元数据:
val df = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:xxx",
"user" -> "x",
"password" -> "x",
"dbtable" -> "(select * from pg_tables) as t")).load()
此示例适用于 PostgreSQL,您必须针对 SQLite 进行调整。
更新
JDBC驱动似乎只支持迭代一个结果集。
无论如何,当您使用 collect()
实现 table 名称列表时,以下代码段应该有效:
val myTableNames = metaData.select("tbl_name").map(_.getString(0)).collect()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.read.format("jdbc")
.options(
Map(
"url" -> "jdbc:sqlite:/x.db",
"dbtable" -> t)).load()
tableData.show()
}
我想 运行 SQLite 数据库中所有 table 的自定义函数。功能大致相同,但取决于个人的架构table。此外,table 及其模式仅在 运行 时已知(使用指定数据库路径的参数调用程序)。
这是我目前拥有的:
val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// somehow bind sqlContext to DB
val allTables = sqlContext.tableNames
for( t <- allTables) {
val df = sqlContext.table(t)
val schema = df.columns
sqlContext.sql("SELECT * FROM " + t + "...").map(x => myFunc(x,schema))
}
到目前为止我发现的唯一提示需要提前知道table,而在我的场景中并非如此:
val tableData =
sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db", "dbtable" -> t))
.load()
我正在使用 xerial sqlite jdbc 驱动程序。那么我怎样才能只连接到数据库,而不是 table?
编辑:使用 Beryllium 的答案作为开始,我将代码更新为:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val metaData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> "(SELECT * FROM sqlite_master) AS t")).load()
val myTableNames = metaData.select("tbl_name").distinct()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.table(t.toString)
for (record <- tableData.select("*")) {
println(record)
}
}
至少我可以在 运行 时阅读 table 个名字,这对我来说是一个巨大的进步。但是我看不懂 tables。我都试过了
val tableData = sqlContext.table(t.toString)
和
val tableData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> t.toString)).load()
在循环中,但在这两种情况下我都得到 NullPointerException。虽然我可以打印 table 个名称,但我似乎无法连接到它们。
最后但同样重要的是,我总是收到 SQLITE_ERROR: Connection is closed
错误。它看起来与这个问题中描述的问题相同:
您可以尝试两种选择
直接使用JDBC
- 在您的 Spark 作业中打开一个单独的普通 JDBC 连接
- 从 JDBC 元数据中获取 table 的名字
- 将这些融入您的
for
理解中
对 "dbtable" 参数使用 SQL 查询
您可以将查询指定为 dbtable
参数的值。从语法上讲,此查询必须 "look" 类似于 table,因此它必须包含在子查询中。
在该查询中,从数据库中获取元数据:
val df = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:xxx",
"user" -> "x",
"password" -> "x",
"dbtable" -> "(select * from pg_tables) as t")).load()
此示例适用于 PostgreSQL,您必须针对 SQLite 进行调整。
更新
JDBC驱动似乎只支持迭代一个结果集。
无论如何,当您使用 collect()
实现 table 名称列表时,以下代码段应该有效:
val myTableNames = metaData.select("tbl_name").map(_.getString(0)).collect()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.read.format("jdbc")
.options(
Map(
"url" -> "jdbc:sqlite:/x.db",
"dbtable" -> t)).load()
tableData.show()
}