查询 sql 服务器 table 时出现 spark sql 错误
spark sql error while querying sql server table
尝试查询 sql 服务器时出错 table 请帮忙。
临时 table 不允许指定数据库名称或其他限定符。如果 table 名称中有点 (.),请在 table 名称中加上反引号 (`).;
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val querytest=sqlContext.sql(query)
val prop=new Properties()
val url2="jdbc:sqlserver://localhost;user=admin;password=oracle;database=AdventureWorks2014"
prop.setProperty("user","admin")
prop.setProperty("password","oracle")
val test=sqlContext.read.jdbc(url2,"Customer",prop)
对有效代码进行了更改:-
package com.kali.db
/**
* Created by kalit_000 on 06/12/2015.
*/
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark._
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.sql.DataFrame
import org.springframework.context.support.ClassPathXmlApplicationContext
case class SparkSqlValueClassMPP(driver:String,url:String,username:String,password:String,table:String,opdelimeter:String,lowerbound:String,upperbound:String,numberofparitions:String,parallelizecolumn:String)
object SparkDBExtractorMPP {
def main (args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkDBExtractorMPP").set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
def opfile(value:DataFrame,delimeter:String):RDD[String]=
{
value.map(x => x.toString.replace("[","").replace("]","").replace(",",delimeter))
}
//read the application context file
val ctx = new ClassPathXmlApplicationContext("sparkDBExtractorMpp.xml")
val DBinfo = ctx.getBean("SparkSQLDBExtractorMPP").asInstanceOf[SparkSqlValueClassMPP]
val driver = DBinfo.driver
val url = DBinfo.url
val username = DBinfo.username
val password = DBinfo.password
val table = DBinfo.table
val opdelimeter=DBinfo.opdelimeter
val lowerbound=DBinfo.lowerbound.toInt
val upperbound=DBinfo.upperbound.toInt
val numberofpartitions=DBinfo.numberofparitions.toInt
val parallelizecolumn=DBinfo.parallelizecolumn
println("DB Driver:-%s".format(driver))
println("DB Url:-%s".format(url))
println("Username:-%s".format(username))
println("Password:-%s".format(password))
println("Table:-%s".format(table))
println("Opdelimeter:-%s".format(opdelimeter))
println("Lowerbound:-%s".format(lowerbound))
println("Upperbound:-%s".format(upperbound))
println("Numberofpartitions:-%s".format(numberofpartitions))
println("Parallelizecolumn:-%s".format(parallelizecolumn))
try {
val props=new Properties()
props.put("user",username)
props.put("password",password)
props.put("driver",driver)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.jdbc(url,table,parallelizecolumn,lowerbound,upperbound,numberofpartitions,props)
df.show(10)
opfile(df,opdelimeter).saveAsTextFile("C:\Users\kalit_000\Desktop\typesafe\scaladbop\op.txt")
} catch {
case e: Exception => e.printStackTrace
}
sc.stop()
}
}
我正在使用 Spring bean 使 spark 代码可配置
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"
"http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<bean id="queryProps" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
</bean>
<bean id="SparkSQLDBExtractorMPP" class="com.kali.db.SparkSqlValueClassMPP">
<constructor-arg value="com.microsoft.sqlserver.jdbc.SQLServerDriver" />
<constructor-arg value="jdbc:sqlserver://localhost;user=admin;password=oracle;database=AdventureWorks2014" />
<constructor-arg value="admin" />
<constructor-arg value="oracle" />
<constructor-arg value="(select top 100 CustomerID,StoreID,TerritoryID,AccountNumber,ModifiedDate from customer ) as customer" />
<constructor-arg value="~" />
<constructor-arg value="1" />
<constructor-arg value="100" />
<constructor-arg value="8" />
<constructor-arg value="CustomerID" />
</bean>
</beans>
如果您使用 spark 1.4,则函数负载会降低。
load
public DataFrame load(String path,
String source)
Deprecated. As of 1.4.0, replaced by read().format(source).load(path)
.
Returns the dataset stored at path as a DataFrame, using the given data source.
Parameters:
path - (undocumented)
source - (undocumented)
Returns:
(undocumented)
这里有新方法http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/
尝试查询 sql 服务器时出错 table 请帮忙。
临时 table 不允许指定数据库名称或其他限定符。如果 table 名称中有点 (.),请在 table 名称中加上反引号 (`).;
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val querytest=sqlContext.sql(query)
val prop=new Properties()
val url2="jdbc:sqlserver://localhost;user=admin;password=oracle;database=AdventureWorks2014"
prop.setProperty("user","admin")
prop.setProperty("password","oracle")
val test=sqlContext.read.jdbc(url2,"Customer",prop)
对有效代码进行了更改:-
package com.kali.db
/**
* Created by kalit_000 on 06/12/2015.
*/
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark._
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.sql.DataFrame
import org.springframework.context.support.ClassPathXmlApplicationContext
case class SparkSqlValueClassMPP(driver:String,url:String,username:String,password:String,table:String,opdelimeter:String,lowerbound:String,upperbound:String,numberofparitions:String,parallelizecolumn:String)
object SparkDBExtractorMPP {
def main (args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkDBExtractorMPP").set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
def opfile(value:DataFrame,delimeter:String):RDD[String]=
{
value.map(x => x.toString.replace("[","").replace("]","").replace(",",delimeter))
}
//read the application context file
val ctx = new ClassPathXmlApplicationContext("sparkDBExtractorMpp.xml")
val DBinfo = ctx.getBean("SparkSQLDBExtractorMPP").asInstanceOf[SparkSqlValueClassMPP]
val driver = DBinfo.driver
val url = DBinfo.url
val username = DBinfo.username
val password = DBinfo.password
val table = DBinfo.table
val opdelimeter=DBinfo.opdelimeter
val lowerbound=DBinfo.lowerbound.toInt
val upperbound=DBinfo.upperbound.toInt
val numberofpartitions=DBinfo.numberofparitions.toInt
val parallelizecolumn=DBinfo.parallelizecolumn
println("DB Driver:-%s".format(driver))
println("DB Url:-%s".format(url))
println("Username:-%s".format(username))
println("Password:-%s".format(password))
println("Table:-%s".format(table))
println("Opdelimeter:-%s".format(opdelimeter))
println("Lowerbound:-%s".format(lowerbound))
println("Upperbound:-%s".format(upperbound))
println("Numberofpartitions:-%s".format(numberofpartitions))
println("Parallelizecolumn:-%s".format(parallelizecolumn))
try {
val props=new Properties()
props.put("user",username)
props.put("password",password)
props.put("driver",driver)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.jdbc(url,table,parallelizecolumn,lowerbound,upperbound,numberofpartitions,props)
df.show(10)
opfile(df,opdelimeter).saveAsTextFile("C:\Users\kalit_000\Desktop\typesafe\scaladbop\op.txt")
} catch {
case e: Exception => e.printStackTrace
}
sc.stop()
}
}
我正在使用 Spring bean 使 spark 代码可配置
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"
"http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<bean id="queryProps" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
</bean>
<bean id="SparkSQLDBExtractorMPP" class="com.kali.db.SparkSqlValueClassMPP">
<constructor-arg value="com.microsoft.sqlserver.jdbc.SQLServerDriver" />
<constructor-arg value="jdbc:sqlserver://localhost;user=admin;password=oracle;database=AdventureWorks2014" />
<constructor-arg value="admin" />
<constructor-arg value="oracle" />
<constructor-arg value="(select top 100 CustomerID,StoreID,TerritoryID,AccountNumber,ModifiedDate from customer ) as customer" />
<constructor-arg value="~" />
<constructor-arg value="1" />
<constructor-arg value="100" />
<constructor-arg value="8" />
<constructor-arg value="CustomerID" />
</bean>
</beans>
如果您使用 spark 1.4,则函数负载会降低。
load public DataFrame load(String path, String source)
Deprecated. As of 1.4.0, replaced byread().format(source).load(path)
.
Returns the dataset stored at path as a DataFrame, using the given data source.
Parameters:
path - (undocumented)
source - (undocumented)
Returns:
(undocumented)
这里有新方法http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/