通过 Spark 进行批量数据迁移 SQL
Bulk data migration through Spark SQL
我目前正在尝试通过 Spark SQL 将非常大的 MySQL table 的内容批量迁移到 parquet 文件中。但是当这样做时,我很快 运行 内存不足,即使将驱动程序的内存限制设置得更高(我在本地模式下使用 spark)。示例代码:
Dataset<Row> ds = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "bigdatatable")
.option("user", "root")
.option("password", "foobar")
.load();
ds.write().mode(SaveMode.Append).parquet("data/bigdatatable");
似乎 Spark 试图将整个 table 内容读入内存,但效果不会很好。那么,通过 Spark SQL 进行批量数据迁移的最佳方法是什么?
在您的解决方案中,Spark 会在开始写入之前将整个 table 内容读入一个分区。避免这种情况的一种方法是对读取部分进行分区,但它需要源数据中的数字顺序列:
Dataset<Row> ds = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "bigdatatable")
.option("user", "root")
.option("password", "foobar")
.option("partitionColumn", "NUMERIC_COL")
.option("lowerBound", "1")
.option("upperBound", "10000")
.option("numPartitions", "64")
.load();
在上面的示例中,列 "NUMERIC_COL" 必须存在于数据中,理想情况下,它应该在 1 到 10000 之间统一变化。当然,这是很多要求,像这样的列将可能不存在,所以你应该在数据库中创建一个带有这样一列的视图,或者你将它添加到查询中(注意我使用了通用的 SQL 语法,你必须适应你的 DBMS) :
String query = "(select mod(row_number(), 64) as NUMERIC_COL, * from bigdatatable) as foo"
Dataset<Row> ds = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", query)
.option("user", "root")
.option("password", "foobar")
.option("partitionColumn", "NUMERIC_COL")
.option("lowerBound", "0")
.option("upperBound", "63")
.option("numPartitions", "64")
.load();
我目前正在尝试通过 Spark SQL 将非常大的 MySQL table 的内容批量迁移到 parquet 文件中。但是当这样做时,我很快 运行 内存不足,即使将驱动程序的内存限制设置得更高(我在本地模式下使用 spark)。示例代码:
Dataset<Row> ds = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "bigdatatable")
.option("user", "root")
.option("password", "foobar")
.load();
ds.write().mode(SaveMode.Append).parquet("data/bigdatatable");
似乎 Spark 试图将整个 table 内容读入内存,但效果不会很好。那么,通过 Spark SQL 进行批量数据迁移的最佳方法是什么?
在您的解决方案中,Spark 会在开始写入之前将整个 table 内容读入一个分区。避免这种情况的一种方法是对读取部分进行分区,但它需要源数据中的数字顺序列:
Dataset<Row> ds = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "bigdatatable")
.option("user", "root")
.option("password", "foobar")
.option("partitionColumn", "NUMERIC_COL")
.option("lowerBound", "1")
.option("upperBound", "10000")
.option("numPartitions", "64")
.load();
在上面的示例中,列 "NUMERIC_COL" 必须存在于数据中,理想情况下,它应该在 1 到 10000 之间统一变化。当然,这是很多要求,像这样的列将可能不存在,所以你应该在数据库中创建一个带有这样一列的视图,或者你将它添加到查询中(注意我使用了通用的 SQL 语法,你必须适应你的 DBMS) :
String query = "(select mod(row_number(), 64) as NUMERIC_COL, * from bigdatatable) as foo"
Dataset<Row> ds = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", query)
.option("user", "root")
.option("password", "foobar")
.option("partitionColumn", "NUMERIC_COL")
.option("lowerBound", "0")
.option("upperBound", "63")
.option("numPartitions", "64")
.load();