Mysql Spark 中的数据处理
Mysql data processing in Spark
我有一个要求,我需要每 5 分钟从多个源系统(Mysql 个实例)获取数据,并用其他数据(假设存在于 S3 中)加入并丰富它们。
我想在 Spark 中进行此处理以将我的执行分配给多个执行程序。
主要问题是每次我在 Mysql 中进行查找时,我只想获取最新记录(比如使用 lastModifiedOn > 时间戳)。
这种 MySql 行的选择性提取如何有效地发生?
这是我试过的:
val filmDf = sqlContext.read.format("jdbc")
.option("url", "jdbc:mysql://localhost/sakila")
.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "film").option("user", "root").option("password", "")
.load()
您应该将 spark sql 与 jdbc 数据源一起使用。我给你举个例子。
val res = spark.read.jdbc(
url = "jdbc:mysql://localhost/test?user=minty&password=greatsqldb",
table = "TEST.table",
columnName = "lastModifiedOn",
lowerBound = lowerTimestamp,
upperBound = upperTimestamp,
numPartitions = 20,
connectionProperties = new Properties()
)
Apache Spark 测试套件中有更多示例:https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
我有一个要求,我需要每 5 分钟从多个源系统(Mysql 个实例)获取数据,并用其他数据(假设存在于 S3 中)加入并丰富它们。
我想在 Spark 中进行此处理以将我的执行分配给多个执行程序。
主要问题是每次我在 Mysql 中进行查找时,我只想获取最新记录(比如使用 lastModifiedOn > 时间戳)。 这种 MySql 行的选择性提取如何有效地发生? 这是我试过的:
val filmDf = sqlContext.read.format("jdbc")
.option("url", "jdbc:mysql://localhost/sakila")
.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "film").option("user", "root").option("password", "")
.load()
您应该将 spark sql 与 jdbc 数据源一起使用。我给你举个例子。
val res = spark.read.jdbc(
url = "jdbc:mysql://localhost/test?user=minty&password=greatsqldb",
table = "TEST.table",
columnName = "lastModifiedOn",
lowerBound = lowerTimestamp,
upperBound = upperTimestamp,
numPartitions = 20,
connectionProperties = new Properties()
)
Apache Spark 测试套件中有更多示例:https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala