MySQL 使用 PySpark 阅读
MySQL read with PySpark
我有以下测试代码:
from pyspark import SparkContext, SQLContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
print('Created spark context!')
if __name__ == '__main__':
df = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost/mysql",
driver="com.mysql.jdbc.Driver",
dbtable="users",
user="user",
password="****",
properties={"driver": 'com.mysql.jdbc.Driver'}
).load()
print(df)
当我 运行 它时,出现以下错误:
java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
在 Scala 中,这是通过将 .jar mysql-connector-java
导入项目来解决的。
但是,在 python 中,我不知道如何告诉 pyspark 模块 link mysql-连接器文件。
我已经通过
等示例解决了这个问题
spark --package=mysql-connector-java testfile.py
但我不想要这个,因为它迫使我以一种奇怪的方式 运行 我的脚本。我想要一个完整的 python 解决方案或将文件复制到某处,或者向路径添加一些内容。
在初始化 SparkConf
之前创建 sparkContext
时,您可以将参数传递给 spark-submit
:
import os
from pyspark import SparkConf, SparkContext
SUBMIT_ARGS = "--packages mysql:mysql-connector-java:5.1.39 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
conf = SparkConf()
sc = SparkContext(conf=conf)
或者您可以将它们添加到您的 $SPARK_HOME/conf/spark-defaults.conf
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("Word Count")\
.config("spark.driver.extraClassPath", "/home/tuhin/mysql.jar")\
.getOrCreate()
dataframe_mysql = spark.read\
.format("jdbc")\
.option("url", "jdbc:mysql://localhost/database_name")\
.option("driver", "com.mysql.jdbc.Driver")\
.option("dbtable", "employees").option("user", "root")\
.option("password", "12345678").load()
print(dataframe_mysql.columns)
"/home/tuhin/mysql.jar" 是 mysql jar 文件的位置
如果您正在使用 pycharm 并且想要逐行 运行 而不是通过 spark-submit 提交您的 .py,您可以将您的 .jar 复制到 c:\spark\jars \ 你的代码可能是这样的:
from pyspark import SparkConf, SparkContext, sql
from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
sqlContext = sql.SQLContext(sc)
source_df = sqlContext.read.format('jdbc').options(
url='jdbc:mysql://localhost:3306/database1',
driver='com.mysql.cj.jdbc.Driver', #com.mysql.jdbc.Driver
dbtable='table1',
user='root',
password='****').load()
print (source_df)
source_df.show()
我有以下测试代码:
from pyspark import SparkContext, SQLContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
print('Created spark context!')
if __name__ == '__main__':
df = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost/mysql",
driver="com.mysql.jdbc.Driver",
dbtable="users",
user="user",
password="****",
properties={"driver": 'com.mysql.jdbc.Driver'}
).load()
print(df)
当我 运行 它时,出现以下错误:
java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
在 Scala 中,这是通过将 .jar mysql-connector-java
导入项目来解决的。
但是,在 python 中,我不知道如何告诉 pyspark 模块 link mysql-连接器文件。
我已经通过
等示例解决了这个问题spark --package=mysql-connector-java testfile.py
但我不想要这个,因为它迫使我以一种奇怪的方式 运行 我的脚本。我想要一个完整的 python 解决方案或将文件复制到某处,或者向路径添加一些内容。
在初始化 SparkConf
之前创建 sparkContext
时,您可以将参数传递给 spark-submit
:
import os
from pyspark import SparkConf, SparkContext
SUBMIT_ARGS = "--packages mysql:mysql-connector-java:5.1.39 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
conf = SparkConf()
sc = SparkContext(conf=conf)
或者您可以将它们添加到您的 $SPARK_HOME/conf/spark-defaults.conf
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("Word Count")\
.config("spark.driver.extraClassPath", "/home/tuhin/mysql.jar")\
.getOrCreate()
dataframe_mysql = spark.read\
.format("jdbc")\
.option("url", "jdbc:mysql://localhost/database_name")\
.option("driver", "com.mysql.jdbc.Driver")\
.option("dbtable", "employees").option("user", "root")\
.option("password", "12345678").load()
print(dataframe_mysql.columns)
"/home/tuhin/mysql.jar" 是 mysql jar 文件的位置
如果您正在使用 pycharm 并且想要逐行 运行 而不是通过 spark-submit 提交您的 .py,您可以将您的 .jar 复制到 c:\spark\jars \ 你的代码可能是这样的:
from pyspark import SparkConf, SparkContext, sql
from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
sqlContext = sql.SQLContext(sc)
source_df = sqlContext.read.format('jdbc').options(
url='jdbc:mysql://localhost:3306/database1',
driver='com.mysql.cj.jdbc.Driver', #com.mysql.jdbc.Driver
dbtable='table1',
user='root',
password='****').load()
print (source_df)
source_df.show()