使用 JDBC 连接器在 spark 中读取 MySQL table 的一部分
Read a part of a MySQL table in spark using JDBC connector
我正在尝试使用 pyspark 中的 JDBC 连接器从 MySQL 数据库中读取 table。我读取 table 的脚本是:
query = "SELECT * FROM C WHERE hitId = 4235441"
readConfig = {
"driver": driver,
"url": url,
"dbtable": tableName,
"user": user,
"password": password,
"query_custom": query
}
saveLocation = mountPoint + "/" + tableName
print(saveLocation)
readDF = spark.read.format("jdbc").options(**readConfig).schema(tableSchema).load()
readDF.write.format("delta").option("mergeSchemas", "True").mode("overwrite").save(saveLocation)
我试图只读取 hitId 为 4235441 的特定行。
问题是,仍然读取整个 table 而不是满足自定义查询的行。 任何人都可以指出我的脚本有什么问题,或者是否有人知道任何其他实现方法objective?
我被困了很长时间,所以非常感谢任何帮助。
在 readConfig
附近的 dbtable
选项中,您正在指定 table_name。而是尝试像下面那样指定 query
query = "SELECT * FROM C WHERE hitId = 4235441"
readConfig = {
"driver": driver,
"url": url,
"dbtable": query,
"user": user,
"password": password,
}
我正在尝试使用 pyspark 中的 JDBC 连接器从 MySQL 数据库中读取 table。我读取 table 的脚本是:
query = "SELECT * FROM C WHERE hitId = 4235441"
readConfig = {
"driver": driver,
"url": url,
"dbtable": tableName,
"user": user,
"password": password,
"query_custom": query
}
saveLocation = mountPoint + "/" + tableName
print(saveLocation)
readDF = spark.read.format("jdbc").options(**readConfig).schema(tableSchema).load()
readDF.write.format("delta").option("mergeSchemas", "True").mode("overwrite").save(saveLocation)
我试图只读取 hitId 为 4235441 的特定行。
问题是,仍然读取整个 table 而不是满足自定义查询的行。 任何人都可以指出我的脚本有什么问题,或者是否有人知道任何其他实现方法objective?
我被困了很长时间,所以非常感谢任何帮助。
在 readConfig
附近的 dbtable
选项中,您正在指定 table_name。而是尝试像下面那样指定 query
query = "SELECT * FROM C WHERE hitId = 4235441"
readConfig = {
"driver": driver,
"url": url,
"dbtable": query,
"user": user,
"password": password,
}