如何在 Spark 2.4.0 中使用 PySpark API 将 table 插入 Hive
How to insert a table into Hive with PySpark API In Spark 2.4.0
我需要插入 table 到 Hive。仅供参考,此 table 在 Hive 中可用。这是我的代码,
from pyspark.sql import SparkSession as sc, HiveContext as HC
spark = sc.builder.appName('eap').enableHiveSupport().getOrCreate()
sqlContext = HC(spark)
sqlContext.sql("INSERT INTO mydb.my_job_status_table
SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime
FROM batches b
inner join sourcetables st on b.rungroup = st.rungroup
inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid
inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid
WHERE b.rungroup like 'sgb_%'")
我运行Spark中的代码后,我收到一个错误,
raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input '01' expecting <EOF>(line 1, pos 195)\n\n== SQL ==\nINSERT INTO mydb.my_job_status_table...
我做错了什么? SqlContext 和 Spark.sql 有什么区别?
你的问题不是 pyspark 特有的。
不要使用 insert into spark sql。
首先,使用 SELECT 制作数据集:
dataset = sqlContext.sql(" SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime
FROM batches b
inner join sourcetables st on b.rungroup = st.rungroup
inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid
inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid
WHERE b.rungroup like 'sgb_%'")
然后使用 insertInto
dataset.insertInto("mydb.my_job_status_table")
试试这个
spark = sc.builder.appName('eap').enableHiveSupport().getOrCreate()
spark.sql("INSERT INTO mydb.my_job_status_table " +
"SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime " +
"FROM batches b " +
"inner join sourcetables st on b.rungroup = st.rungroup " +
"inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid " +
"inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid " +
"WHERE b.rungroup like 'sgb_%'")
我需要插入 table 到 Hive。仅供参考,此 table 在 Hive 中可用。这是我的代码,
from pyspark.sql import SparkSession as sc, HiveContext as HC
spark = sc.builder.appName('eap').enableHiveSupport().getOrCreate()
sqlContext = HC(spark)
sqlContext.sql("INSERT INTO mydb.my_job_status_table
SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime
FROM batches b
inner join sourcetables st on b.rungroup = st.rungroup
inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid
inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid
WHERE b.rungroup like 'sgb_%'")
我运行Spark中的代码后,我收到一个错误,
raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input '01' expecting <EOF>(line 1, pos 195)\n\n== SQL ==\nINSERT INTO mydb.my_job_status_table...
我做错了什么? SqlContext 和 Spark.sql 有什么区别?
你的问题不是 pyspark 特有的。
不要使用 insert into spark sql。
首先,使用 SELECT 制作数据集:
dataset = sqlContext.sql(" SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime
FROM batches b
inner join sourcetables st on b.rungroup = st.rungroup
inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid
inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid
WHERE b.rungroup like 'sgb_%'")
然后使用 insertInto
dataset.insertInto("mydb.my_job_status_table")
试试这个
spark = sc.builder.appName('eap').enableHiveSupport().getOrCreate()
spark.sql("INSERT INTO mydb.my_job_status_table " +
"SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime " +
"FROM batches b " +
"inner join sourcetables st on b.rungroup = st.rungroup " +
"inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid " +
"inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid " +
"WHERE b.rungroup like 'sgb_%'")