如何使用 Apache Spark Scala 获取大型 CSV / RDD [Array [double]] 中所有列的直方图?
How to get Histogram of all columns in a large CSV / RDD[Array[double]] using Apache Spark Scala?
我正在尝试使用 Spark Scala 计算 CSV 文件中所有列的直方图。
我发现 DoubleRDDFunctions 支持直方图。
所以我编码如下以获得所有列的直方图。
- 获取列数
创建每列的 RDD[double]
并使用 DoubleRDDFunctions
计算每个 RDD
的直方图
var columnIndexArray = Array.tabulate(rdd.first().length) (_ * 1)
val histogramData = columnIndexArray.map(columns => {
rdd.map(lines => lines(columns)).histogram(6)
})
这样好吗?
谁能建议一些更好的方法来解决这个问题?
提前致谢。
(scala api) 转换,countByValue 应该做你想做的事
例如,为您的 RDD 中的第一列生成直方图数据:
val histCol1 = RDD.map(record => record.col_1).countByValue()
在上面的表达式中,record只是引用了RDD中的一个数据行,一个caseclass的实例,它有一个字段col_1
因此 histCol1 将 return 散列 table (Scala Map),其中键是第 1 列中的唯一值 (col_1) 并且值显然是每个唯一值的频率
不是更好,但替代方法是将 RDD 转换为 DataFrame 并使用 histogram_numeric
UDF。
示例数据:
import scala.util.Random
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{callUDF, lit, col}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext(sc)
Random.setSeed(1)
val ncol = 5
val rdd = sc.parallelize((1 to 1000).map(
_ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble))
))
val schema = StructType(
(1 to ncol).map(i => StructField(s"x$i", DoubleType, false)))
val df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("df")
查询:
val nBuckets = 3
val columns = df.columns.map(
c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c))
val histograms = df.select(columns: _*)
histograms.printSchema
// root
// |-- x1: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x2: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x3: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x4: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x5: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
histograms.select($"x1").collect()
// Array([WrappedArray([0.16874313309969038,334.0],
// [0.513382068667877,345.0], [0.8421388886903808,321.0])])
我正在尝试使用 Spark Scala 计算 CSV 文件中所有列的直方图。
我发现 DoubleRDDFunctions 支持直方图。 所以我编码如下以获得所有列的直方图。
- 获取列数
创建每列的
计算每个RDD[double]
并使用DoubleRDDFunctions
RDD
的直方图var columnIndexArray = Array.tabulate(rdd.first().length) (_ * 1) val histogramData = columnIndexArray.map(columns => { rdd.map(lines => lines(columns)).histogram(6) })
这样好吗? 谁能建议一些更好的方法来解决这个问题?
提前致谢。
(scala api) 转换,countByValue 应该做你想做的事
例如,为您的 RDD 中的第一列生成直方图数据:
val histCol1 = RDD.map(record => record.col_1).countByValue()
在上面的表达式中,record只是引用了RDD中的一个数据行,一个caseclass的实例,它有一个字段col_1
因此 histCol1 将 return 散列 table (Scala Map),其中键是第 1 列中的唯一值 (col_1) 并且值显然是每个唯一值的频率
不是更好,但替代方法是将 RDD 转换为 DataFrame 并使用 histogram_numeric
UDF。
示例数据:
import scala.util.Random
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{callUDF, lit, col}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext(sc)
Random.setSeed(1)
val ncol = 5
val rdd = sc.parallelize((1 to 1000).map(
_ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble))
))
val schema = StructType(
(1 to ncol).map(i => StructField(s"x$i", DoubleType, false)))
val df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("df")
查询:
val nBuckets = 3
val columns = df.columns.map(
c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c))
val histograms = df.select(columns: _*)
histograms.printSchema
// root
// |-- x1: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x2: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x3: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x4: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x5: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
histograms.select($"x1").collect()
// Array([WrappedArray([0.16874313309969038,334.0],
// [0.513382068667877,345.0], [0.8421388886903808,321.0])])