如何从同一个数据库中读取多个表并将它们保存到自己的 CSV 文件中?
How to read many tables from the same database and save them to their own CSV file?
下面是连接到 SQL 服务器的工作代码,并将 1 table 保存到 CSV 格式文件。
conf = new SparkConf().setAppName("test").setMaster("local").set("spark.driver.allowMultipleContexts", "true");
sc = new SparkContext(conf)
sqlContext = new SQLContext(sc)
df = sqlContext.read.format("jdbc").option("url","jdbc:sqlserver://DBServer:PORT").option("databaseName","xxx").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable","xxx").option("user","xxx").option("password","xxxx").load()
df.registerTempTable("test")
df.write.format("com.databricks.spark.csv").save("poc/amitesh/csv")
exit()
我有一个场景,我必须通过 pyspark 代码以 CSV 格式一次将 4 个 table 从同一数据库保存到 4 个不同的文件中。有没有什么办法可以实现objective?或者,这些拆分是在 HDFS 块大小级别完成的,所以如果你有一个 300mb 的文件,并且 HDFS 块大小设置为 128,那么你会得到 3 个块,分别为 128mb、128mb 和 44mb?
where in I have to save 4 table from same database in CSV format in 4 different files at a time through pyspark code.
您必须为数据库中的每个 table 编写转换(读取和写入)代码(使用 sqlContext.read.format
)。
table 特定 ETL 管道之间的唯一区别是每个 table 的 dbtable
选项不同。拥有 DataFrame 后,将其保存到自己的 CSV 文件中。
代码可能如下所示(在 Scala 中,所以我将其转换为 Python 作为家庭练习):
val datasetFromTABLE_ONE: DataFrame = sqlContext.
read.
format("jdbc").
option("url","jdbc:sqlserver://DBServer:PORT").
option("databaseName","xxx").
option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").
option("dbtable","TABLE_ONE").
option("user","xxx").
option("password","xxxx").
load()
// save the dataset from TABLE_ONE into its own CSV file
datasetFromTABLE_ONE.write.csv("table_one.csv")
对每个 table 要保存到 CSV 的文件重复相同的代码。
完成!
100-table 个案例——公平调度
解决方案求另一个:
What when I have 100 or more tables? How to optimize the code for that? How to do it effectively in Spark? Any parallelization?
SparkContext
位于我们用于 ETL 管道的 SparkSession
之后是线程安全的,这意味着您可以从多个线程使用它。如果您考虑每个 table 一个线程,那是正确的方法。
您可以生成与 table 一样多的线程,比如说 100,然后启动它们。然后 Spark 可以决定执行什么以及何时执行。
这是 Spark 使用 Fair Scheduler Pools 所做的事情。这不是 Spark 的广为人知的特性,在这种情况下值得考虑:
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users).
使用它,您的加载和保存管道可能会变得更快。
下面是连接到 SQL 服务器的工作代码,并将 1 table 保存到 CSV 格式文件。
conf = new SparkConf().setAppName("test").setMaster("local").set("spark.driver.allowMultipleContexts", "true");
sc = new SparkContext(conf)
sqlContext = new SQLContext(sc)
df = sqlContext.read.format("jdbc").option("url","jdbc:sqlserver://DBServer:PORT").option("databaseName","xxx").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable","xxx").option("user","xxx").option("password","xxxx").load()
df.registerTempTable("test")
df.write.format("com.databricks.spark.csv").save("poc/amitesh/csv")
exit()
我有一个场景,我必须通过 pyspark 代码以 CSV 格式一次将 4 个 table 从同一数据库保存到 4 个不同的文件中。有没有什么办法可以实现objective?或者,这些拆分是在 HDFS 块大小级别完成的,所以如果你有一个 300mb 的文件,并且 HDFS 块大小设置为 128,那么你会得到 3 个块,分别为 128mb、128mb 和 44mb?
where in I have to save 4 table from same database in CSV format in 4 different files at a time through pyspark code.
您必须为数据库中的每个 table 编写转换(读取和写入)代码(使用 sqlContext.read.format
)。
table 特定 ETL 管道之间的唯一区别是每个 table 的 dbtable
选项不同。拥有 DataFrame 后,将其保存到自己的 CSV 文件中。
代码可能如下所示(在 Scala 中,所以我将其转换为 Python 作为家庭练习):
val datasetFromTABLE_ONE: DataFrame = sqlContext.
read.
format("jdbc").
option("url","jdbc:sqlserver://DBServer:PORT").
option("databaseName","xxx").
option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").
option("dbtable","TABLE_ONE").
option("user","xxx").
option("password","xxxx").
load()
// save the dataset from TABLE_ONE into its own CSV file
datasetFromTABLE_ONE.write.csv("table_one.csv")
对每个 table 要保存到 CSV 的文件重复相同的代码。
完成!
100-table 个案例——公平调度
解决方案求另一个:
What when I have 100 or more tables? How to optimize the code for that? How to do it effectively in Spark? Any parallelization?
SparkContext
位于我们用于 ETL 管道的 SparkSession
之后是线程安全的,这意味着您可以从多个线程使用它。如果您考虑每个 table 一个线程,那是正确的方法。
您可以生成与 table 一样多的线程,比如说 100,然后启动它们。然后 Spark 可以决定执行什么以及何时执行。
这是 Spark 使用 Fair Scheduler Pools 所做的事情。这不是 Spark 的广为人知的特性,在这种情况下值得考虑:
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users).
使用它,您的加载和保存管道可能会变得更快。