Spark 从行中提取值
Spark extracting values from a Row
我有以下数据框
val transactions_with_counts = sqlContext.sql(
"""SELECT user_id AS user_id, category_id AS category_id,
COUNT(category_id) FROM transactions GROUP BY user_id, category_id""")
我正在尝试将行转换为 Rating 对象,但是由于 x(0) returns 一个数组,所以失败了
val ratings = transactions_with_counts
.map(x => Rating(x(0).toInt, x(1).toInt, x(2).toInt))
error: value toInt is not a member of Any
让我们从一些虚拟数据开始:
val transactions = Seq((1, 2), (1, 4), (2, 3)).toDF("user_id", "category_id")
val transactions_with_counts = transactions
.groupBy($"user_id", $"category_id")
.count
transactions_with_counts.printSchema
// root
// |-- user_id: integer (nullable = false)
// |-- category_id: integer (nullable = false)
// |-- count: long (nullable = false)
有几种方法可以访问 Row
值并保持预期类型:
模式匹配
import org.apache.spark.sql.Row
transactions_with_counts.map{
case Row(user_id: Int, category_id: Int, rating: Long) =>
Rating(user_id, category_id, rating)
}
键入 get*
方法,例如 getInt
、getLong
:
transactions_with_counts.map(
r => Rating(r.getInt(0), r.getInt(1), r.getLong(2))
)
getAs
可以同时使用名称和索引的方法:
transactions_with_counts.map(r => Rating(
r.getAs[Int]("user_id"), r.getAs[Int]("category_id"), r.getAs[Long](2)
))
它可用于正确提取用户定义的类型,包括mllib.linalg.Vector
。显然按名称访问需要模式。
转换为静态类型 Dataset
(Spark 1.6+ / 2.0+):
transactions_with_counts.as[(Int, Int, Long)]
使用数据集,您可以按如下方式定义评级:
case class Rating(user_id: Int, category_id:Int, count:Long)
这里的评级 class 有一个列名称 'count' 而不是 zero323 建议的 'rating'。因此评分变量分配如下:
val transactions_with_counts = transactions.groupBy($"user_id", $"category_id").count
val rating = transactions_with_counts.as[Rating]
这样你就不会 运行 在 Spark 中出现 运行 次错误,因为你的
评级 class 列名称与 Spark 在 运行 时间生成的 'count' 列名称相同。
我有以下数据框
val transactions_with_counts = sqlContext.sql(
"""SELECT user_id AS user_id, category_id AS category_id,
COUNT(category_id) FROM transactions GROUP BY user_id, category_id""")
我正在尝试将行转换为 Rating 对象,但是由于 x(0) returns 一个数组,所以失败了
val ratings = transactions_with_counts
.map(x => Rating(x(0).toInt, x(1).toInt, x(2).toInt))
error: value toInt is not a member of Any
让我们从一些虚拟数据开始:
val transactions = Seq((1, 2), (1, 4), (2, 3)).toDF("user_id", "category_id")
val transactions_with_counts = transactions
.groupBy($"user_id", $"category_id")
.count
transactions_with_counts.printSchema
// root
// |-- user_id: integer (nullable = false)
// |-- category_id: integer (nullable = false)
// |-- count: long (nullable = false)
有几种方法可以访问 Row
值并保持预期类型:
模式匹配
import org.apache.spark.sql.Row transactions_with_counts.map{ case Row(user_id: Int, category_id: Int, rating: Long) => Rating(user_id, category_id, rating) }
键入
get*
方法,例如getInt
、getLong
:transactions_with_counts.map( r => Rating(r.getInt(0), r.getInt(1), r.getLong(2)) )
getAs
可以同时使用名称和索引的方法:transactions_with_counts.map(r => Rating( r.getAs[Int]("user_id"), r.getAs[Int]("category_id"), r.getAs[Long](2) ))
它可用于正确提取用户定义的类型,包括
mllib.linalg.Vector
。显然按名称访问需要模式。转换为静态类型
Dataset
(Spark 1.6+ / 2.0+):transactions_with_counts.as[(Int, Int, Long)]
使用数据集,您可以按如下方式定义评级:
case class Rating(user_id: Int, category_id:Int, count:Long)
这里的评级 class 有一个列名称 'count' 而不是 zero323 建议的 'rating'。因此评分变量分配如下:
val transactions_with_counts = transactions.groupBy($"user_id", $"category_id").count
val rating = transactions_with_counts.as[Rating]
这样你就不会 运行 在 Spark 中出现 运行 次错误,因为你的 评级 class 列名称与 Spark 在 运行 时间生成的 'count' 列名称相同。