将 Pyspark 与 SQL 数据库一起使用的最佳方式
Best way to use Pyspark with SQL DB
我的 SQL 数据库有包含数百万条记录的表,其中一些有几亿条记录,我的主要 select 大约有 4000 行代码,但结构是这样的:
SELECT A.seq field1, field2, field3, field4,
(select field from tableX X... where A.seq = X.seq ...) field5,
(select field from tableY Y... where A.seq = Y.seq ...) field6,
(select field from tableN Z... where A.seq = Z.seq ...) field7,
field8, field9
FROM tableA A, tableB B, tableN N
WHERE A.seq = B.seq
AND A.req_seq = N.req_seq;
我的想法是做这样的事情:
# load the tables in the cluster separately
conf = SparkConf().setAppName("MyApp")
sc = SparkContext(master="local[*]", conf=conf)
sql = HiveContext(sc)
dataframeA = sql.read.format("jdbc").option("url",
"db_url")\
.option("driver", "myDriver")\
.option("dbtable", tableA)\
.option("user", "myuser")\
.option("password", "mypass").load()
dataframeB = sql.read.format("jdbc").option("url",
"db_url")\
.option("driver", "myDriver")\
.option("dbtable", tableC)\
.option("user", "myuser")\
.option("password", "mypass").load()
dataframeC = sql.read.format("jdbc").option("url",
"db_url")\
.option("driver", "myDriver")\
.option("dbtable", tableC)\
.option("user", "myuser")\
.option("password", "mypass").load()
# then do the needed joins
df_aux = dataframeA.join(dataframeB, dataframeA.seq == dataframeB.seq)
df_res_aux = df_aux.join(dataframeC, df_aux.req_seq == dataframeC.req_seq)
# then with that dataframe calculate the subselect fields
def calculate_field5(seq):
# load the table in the cluster as with the main tables
# and query the datafame
# or make the query to DB and return the field
return field
df_res = df_res_aux.withColumn('field5', calculate_field5(df_res_aux.seq))
# the same for the rest of fields
这样好吗?
我应该换一种方式吗?
非常非常感谢任何建议
嗯,
如果你想在你的执行中使用 MySql,这是做到这一点的方法。
但是请注意,由于 mySql 查询时间,您的执行可能会花费很多时间到 运行。 MySql 不是分布式数据库,因此您可以花很多时间从 mySql.
中检索数据
我给你的建议。
尝试将数据检索到 hdfs(如果您有 HDFS),尝试使用 Sqoop for that. Here 一个如何以增量方式使用它的示例。
尝试将存储的数据转换为Orc. See the example here。
这个建议是为了减少数据库的执行时间。每次您直接从 MySql 请求数据时。您正在使用 MySql 的资源将数据发送到 Spark。按照我建议的方式,您可以将数据库复制到 HDFS,并将此数据带到 Spark 进行处理。这不会导致您的数据库执行时间。
为什么要使用兽人? Orc 是将数据转换为紧凑的柱状结构的不错选择。这将增加您的数据检索和搜索。
我的 SQL 数据库有包含数百万条记录的表,其中一些有几亿条记录,我的主要 select 大约有 4000 行代码,但结构是这样的:
SELECT A.seq field1, field2, field3, field4,
(select field from tableX X... where A.seq = X.seq ...) field5,
(select field from tableY Y... where A.seq = Y.seq ...) field6,
(select field from tableN Z... where A.seq = Z.seq ...) field7,
field8, field9
FROM tableA A, tableB B, tableN N
WHERE A.seq = B.seq
AND A.req_seq = N.req_seq;
我的想法是做这样的事情:
# load the tables in the cluster separately
conf = SparkConf().setAppName("MyApp")
sc = SparkContext(master="local[*]", conf=conf)
sql = HiveContext(sc)
dataframeA = sql.read.format("jdbc").option("url",
"db_url")\
.option("driver", "myDriver")\
.option("dbtable", tableA)\
.option("user", "myuser")\
.option("password", "mypass").load()
dataframeB = sql.read.format("jdbc").option("url",
"db_url")\
.option("driver", "myDriver")\
.option("dbtable", tableC)\
.option("user", "myuser")\
.option("password", "mypass").load()
dataframeC = sql.read.format("jdbc").option("url",
"db_url")\
.option("driver", "myDriver")\
.option("dbtable", tableC)\
.option("user", "myuser")\
.option("password", "mypass").load()
# then do the needed joins
df_aux = dataframeA.join(dataframeB, dataframeA.seq == dataframeB.seq)
df_res_aux = df_aux.join(dataframeC, df_aux.req_seq == dataframeC.req_seq)
# then with that dataframe calculate the subselect fields
def calculate_field5(seq):
# load the table in the cluster as with the main tables
# and query the datafame
# or make the query to DB and return the field
return field
df_res = df_res_aux.withColumn('field5', calculate_field5(df_res_aux.seq))
# the same for the rest of fields
这样好吗? 我应该换一种方式吗?
非常非常感谢任何建议
嗯,
如果你想在你的执行中使用 MySql,这是做到这一点的方法。
但是请注意,由于 mySql 查询时间,您的执行可能会花费很多时间到 运行。 MySql 不是分布式数据库,因此您可以花很多时间从 mySql.
中检索数据我给你的建议。
尝试将数据检索到 hdfs(如果您有 HDFS),尝试使用 Sqoop for that. Here 一个如何以增量方式使用它的示例。
尝试将存储的数据转换为Orc. See the example here。
这个建议是为了减少数据库的执行时间。每次您直接从 MySql 请求数据时。您正在使用 MySql 的资源将数据发送到 Spark。按照我建议的方式,您可以将数据库复制到 HDFS,并将此数据带到 Spark 进行处理。这不会导致您的数据库执行时间。
为什么要使用兽人? Orc 是将数据转换为紧凑的柱状结构的不错选择。这将增加您的数据检索和搜索。