通过 Spark 进行批量数据迁移 SQL

Bulk data migration through Spark SQL

我目前正在尝试通过 Spark SQL 将非常大的 MySQL table 的内容批量迁移到 parquet 文件中。但是当这样做时,我很快 运行 内存不足,即使将驱动程序的内存限制设置得更高(我在本地模式下使用 spark)。示例代码:

Dataset<Row> ds = spark.read()
    .format("jdbc")
    .option("url", url)
    .option("driver", "com.mysql.jdbc.Driver")
    .option("dbtable", "bigdatatable")
    .option("user", "root")
    .option("password", "foobar")
    .load();

ds.write().mode(SaveMode.Append).parquet("data/bigdatatable");

似乎 Spark 试图将整个 table 内容读入内存,但效果不会很好。那么,通过 Spark SQL 进行批量数据迁移的最佳方法是什么?

在您的解决方案中,Spark 会在开始写入之前将整个 table 内容读入一个分区。避免这种情况的一种方法是对读取部分进行分区,但它需要源数据中的数字顺序列:

Dataset<Row> ds = spark.read()
  .format("jdbc")
  .option("url", url)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "bigdatatable")
  .option("user", "root")
  .option("password", "foobar")
  .option("partitionColumn", "NUMERIC_COL")
  .option("lowerBound", "1")
  .option("upperBound", "10000")
  .option("numPartitions", "64")
  .load();

在上面的示例中,列 "NUMERIC_COL" 必须存在于数据中,理想情况下,它应该在 1 到 10000 之间统一变化。当然,这是很多要求,像这样的列将可能不存在,所以你应该在数据库中创建一个带有这样一列的视图,或者你将它添加到查询中(注意我使用了通用的 SQL 语法,你必须适应你的 DBMS) :

String query = "(select mod(row_number(), 64) as NUMERIC_COL, * from bigdatatable) as foo"

Dataset<Row> ds = spark.read()
  .format("jdbc")
  .option("url", url)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", query)
  .option("user", "root")
  .option("password", "foobar")
  .option("partitionColumn", "NUMERIC_COL")
  .option("lowerBound", "0")
  .option("upperBound", "63")
  .option("numPartitions", "64")
  .load();