Mysql Spark 中的数据处理

Mysql data processing in Spark

我有一个要求,我需要每 5 分钟从多个源系统(Mysql 个实例)获取数据,并用其他数据(假设存在于 S3 中)加入并丰富它们。

我想在 Spark 中进行此处理以将我的执行分配给多个执行程序。

主要问题是每次我在 Mysql 中进行查找时,我只想获取最新记录(比如使用 lastModifiedOn > 时间戳)。 这种 MySql 行的选择性提取如何有效地发生? 这是我试过的:

val filmDf = sqlContext.read.format("jdbc")
  .option("url", "jdbc:mysql://localhost/sakila")
  .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "film").option("user", "root").option("password", "")
  .load()

您应该将 spark sql 与 jdbc 数据源一起使用。我给你举个例子。

val res = spark.read.jdbc(
      url = "jdbc:mysql://localhost/test?user=minty&password=greatsqldb",
      table = "TEST.table",
      columnName = "lastModifiedOn",
      lowerBound = lowerTimestamp,
      upperBound = upperTimestamp,
      numPartitions = 20,
      connectionProperties = new Properties()
    )

Apache Spark 测试套件中有更多示例:https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala