scala - 用于激发 Dataframe 的结果集
scala - Resultset to spark Dataframe
我正在查询 mysql table
val url = "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/pg_partner"
val driver = "com.mysql.jdbc.Driver"
val username = "XXX"
val password = "XXX"
var connection:Connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement()
val patnerName = statement.executeQuery("SELECT id,name FROM partner")
我确实在 patnerName
中得到了我的结果,但我需要转换为 Dataframe。
我可以通过以下代码打印数据:
while (patnerName.next) {
val id = patnerName.getString("id")
val name = patnerName.getString("name")
println("id = %s, name = %s".format(id,name))
}
现在如何将 patnerName
转换为 DataFrame?
直接使用 Spark 功能怎么样?
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/")
.option("dbtable", "pg_partner")
.option("user", "XXX")
.option("password", "XXX")
.load()
代码取自here。
因此您必须分几步完成:
- 定义列并准备架构
val columns = Seq("id", "name")
val schema = StructType(List(
StructField("id", StringType, nullable = true),
StructField("name", StringType, nullable = true)
))
- 定义在每次迭代时如何将 ResultSet 中的每条记录转换为一行
def parseResultSet(rs: ResultSet): Row = {
val resultSetRecord = columns.map(c => rs.getString(c))
Row(resultSetRecord:_*)
}
- 定义一个函数将您的 ResultSet 转换为 Iterator[Row]。它将使用您在上一步中定义的函数(当您在下一步中调用它时)。
def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] =
new Iterator[Row] {
def hasNext: Boolean = rs.next()
def next(): Row = f(rs)
}
- 定义一个函数,该函数从 Iterator[Row].toSeq 中创建 RDD,该 Iterator[Row].toSeq 使用您在上一步中定义的函数。使用模式从 RDD
创建 DataFrame
def parallelizeResultSet(rs: ResultSet, spark: SparkSession): DataFrame = {
val rdd = spark.sparkContext.parallelize(resultSetToIter(rs)(parseResultSet).toSeq)
spark.createDataFrame(rdd, schema) // use the schema you defined in step 1
}
- 最后调用你的函数
val df: DataFrame = parallelizeResultSet(patner, spark)
我正在查询 mysql table
val url = "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/pg_partner"
val driver = "com.mysql.jdbc.Driver"
val username = "XXX"
val password = "XXX"
var connection:Connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement()
val patnerName = statement.executeQuery("SELECT id,name FROM partner")
我确实在 patnerName
中得到了我的结果,但我需要转换为 Dataframe。
我可以通过以下代码打印数据:
while (patnerName.next) {
val id = patnerName.getString("id")
val name = patnerName.getString("name")
println("id = %s, name = %s".format(id,name))
}
现在如何将 patnerName
转换为 DataFrame?
直接使用 Spark 功能怎么样?
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/")
.option("dbtable", "pg_partner")
.option("user", "XXX")
.option("password", "XXX")
.load()
代码取自here。
因此您必须分几步完成:
- 定义列并准备架构
val columns = Seq("id", "name")
val schema = StructType(List(
StructField("id", StringType, nullable = true),
StructField("name", StringType, nullable = true)
))
- 定义在每次迭代时如何将 ResultSet 中的每条记录转换为一行
def parseResultSet(rs: ResultSet): Row = {
val resultSetRecord = columns.map(c => rs.getString(c))
Row(resultSetRecord:_*)
}
- 定义一个函数将您的 ResultSet 转换为 Iterator[Row]。它将使用您在上一步中定义的函数(当您在下一步中调用它时)。
def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] =
new Iterator[Row] {
def hasNext: Boolean = rs.next()
def next(): Row = f(rs)
}
- 定义一个函数,该函数从 Iterator[Row].toSeq 中创建 RDD,该 Iterator[Row].toSeq 使用您在上一步中定义的函数。使用模式从 RDD 创建 DataFrame
def parallelizeResultSet(rs: ResultSet, spark: SparkSession): DataFrame = {
val rdd = spark.sparkContext.parallelize(resultSetToIter(rs)(parseResultSet).toSeq)
spark.createDataFrame(rdd, schema) // use the schema you defined in step 1
}
- 最后调用你的函数
val df: DataFrame = parallelizeResultSet(patner, spark)