使用 Scala,从 Java ResultSet 创建 DataFrame 或 RDD
Using Scala, create DataFrame or RDD from Java ResultSet
我不想直接使用 spark.read 方法创建 Dataframe 或 RDD。我想从 java 结果集(有 5,000,00 条记录)形成数据框或 RDD。如果您提供勤奋的解决方案,我们将不胜感激。
首先使用RowFactory,我们可以创建行。其次,可以使用 SQLContext.createDataFrame 方法将所有行转换为 Dataframe。希望这对您也有帮助 :).
import java.sql.Connection
import java.sql.ResultSet
import org.apache.spark.sql.RowFactory
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
var resultSet: ResultSet = null
val rowList = new scala.collection.mutable.MutableList[Row]
var cRow: Row = null
//Resultset is created from traditional Java JDBC.
val resultSet = DbConnection.createStatement().execute("Sql")
//Looping resultset
while (resultSet.next()) {
//adding two columns into a "Row" object
cRow = RowFactory.create(resultSet.getObject(1), resultSet.getObject(2))
//adding each rows into "List" object.
rowList += (cRow)
}
val sconf = new SparkConf
sconf.setAppName("")
sconf.setMaster("local[*]")
var sContext: SparkContext = new SparkContext(sConf)
var sqlContext: SQLContext = new SQLContext(sContext)
//creates a dataframe
DF = sqlContext.createDataFrame(sContext.parallelize(rowList ,2), getSchema())
DF.show() //show the dataframe.
def getSchema(): StructType = {
val DecimalType = DataTypes.createDecimalType(38, 10)
val schema = StructType(
StructField("COUNT", LongType, false) ::
StructField("TABLE_NAME", StringType, false) :: Nil)
//Returning the schema to define dataframe columns.
schema
}
我不想直接使用 spark.read 方法创建 Dataframe 或 RDD。我想从 java 结果集(有 5,000,00 条记录)形成数据框或 RDD。如果您提供勤奋的解决方案,我们将不胜感激。
首先使用RowFactory,我们可以创建行。其次,可以使用 SQLContext.createDataFrame 方法将所有行转换为 Dataframe。希望这对您也有帮助 :).
import java.sql.Connection
import java.sql.ResultSet
import org.apache.spark.sql.RowFactory
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
var resultSet: ResultSet = null
val rowList = new scala.collection.mutable.MutableList[Row]
var cRow: Row = null
//Resultset is created from traditional Java JDBC.
val resultSet = DbConnection.createStatement().execute("Sql")
//Looping resultset
while (resultSet.next()) {
//adding two columns into a "Row" object
cRow = RowFactory.create(resultSet.getObject(1), resultSet.getObject(2))
//adding each rows into "List" object.
rowList += (cRow)
}
val sconf = new SparkConf
sconf.setAppName("")
sconf.setMaster("local[*]")
var sContext: SparkContext = new SparkContext(sConf)
var sqlContext: SQLContext = new SQLContext(sContext)
//creates a dataframe
DF = sqlContext.createDataFrame(sContext.parallelize(rowList ,2), getSchema())
DF.show() //show the dataframe.
def getSchema(): StructType = {
val DecimalType = DataTypes.createDecimalType(38, 10)
val schema = StructType(
StructField("COUNT", LongType, false) ::
StructField("TABLE_NAME", StringType, false) :: Nil)
//Returning the schema to define dataframe columns.
schema
}