计算多列的众数
Calculate a mode for multiple columns
我想在 Spark 中同时计算多列的模式,并使用此计算值来估算 DataFrame 中的缺失。我找到了如何计算例如一个意思,但我认为模式更复杂。
这里是平均计算:
val multiple_mean = df.na.fill(df.columns.zip(
df.select(intVars.map(mean(_)): _*).first.toSeq
).toMap)
我可以用蛮力的方式计算模式:
var list = ArrayBuffer.empty[Float]
for(column <- df.columns){
list += df.select(column).groupBy(col(column)).count().orderBy(desc("count")).first.toSeq(0).asInstanceOf[Float]
}
val multiple_mode = df.na.fill(df.columns.zip(list.toSeq).toMap)
如果考虑性能的话,什么方式最好?
感谢您的帮助。
您可以使用 UserDefinedAggregateFunction。以下代码在 spark 1.6.2
中测试
首先创建一个扩展 UserDefinedAggregateFunction 的 class。
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class ModeUDAF extends UserDefinedAggregateFunction{
override def dataType: DataType = StringType
override def inputSchema: StructType = new StructType().add("input", StringType)
override def deterministic: Boolean = true
override def bufferSchema: StructType = new StructType().add("mode", MapType(StringType, LongType))
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Map.empty[Any, Long]
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val buff0 = buffer.getMap[Any, Long](0)
val inp = input.get(0)
buffer(0) = buff0.updated(inp, buff0.getOrElse(inp, 0L) + 1L)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val mp1 = buffer1.getMap[Any, Long](0)
val mp2 = buffer2.getMap[Any, Long](0)
buffer1(0) = mp1 ++ mp2.map { case (k, v) => k -> (v + mp1.getOrElse(k, 0L)) }
}
override def evaluate(buffer: Row): Any = {
lazy val st = buffer.getMap[Any, Long](0).toStream
val mode = st.foldLeft(st.head){case (e, s) => if (s._2 > e._2) s else e}
mode._1
}
}
之后您可以通过以下方式将它与您的数据框一起使用。
val modeColumnList = List("some", "column", "names") // or df.columns.toList
val modeAgg = new ModeUDAF()
val aggCols = modeColumnList.map(c => modeAgg(df(c)))
val aggregatedModeDF = df.agg(aggCols.head, aggCols.tail: _*)
aggregatedModeDF.show()
您也可以在最终数据帧上使用 .collect 以将结果收集到 scala 数据结构中。
注意:此解决方案的性能取决于输入列的基数。
我想在 Spark 中同时计算多列的模式,并使用此计算值来估算 DataFrame 中的缺失。我找到了如何计算例如一个意思,但我认为模式更复杂。
这里是平均计算:
val multiple_mean = df.na.fill(df.columns.zip(
df.select(intVars.map(mean(_)): _*).first.toSeq
).toMap)
我可以用蛮力的方式计算模式:
var list = ArrayBuffer.empty[Float]
for(column <- df.columns){
list += df.select(column).groupBy(col(column)).count().orderBy(desc("count")).first.toSeq(0).asInstanceOf[Float]
}
val multiple_mode = df.na.fill(df.columns.zip(list.toSeq).toMap)
如果考虑性能的话,什么方式最好?
感谢您的帮助。
您可以使用 UserDefinedAggregateFunction。以下代码在 spark 1.6.2
中测试首先创建一个扩展 UserDefinedAggregateFunction 的 class。
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class ModeUDAF extends UserDefinedAggregateFunction{
override def dataType: DataType = StringType
override def inputSchema: StructType = new StructType().add("input", StringType)
override def deterministic: Boolean = true
override def bufferSchema: StructType = new StructType().add("mode", MapType(StringType, LongType))
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Map.empty[Any, Long]
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val buff0 = buffer.getMap[Any, Long](0)
val inp = input.get(0)
buffer(0) = buff0.updated(inp, buff0.getOrElse(inp, 0L) + 1L)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val mp1 = buffer1.getMap[Any, Long](0)
val mp2 = buffer2.getMap[Any, Long](0)
buffer1(0) = mp1 ++ mp2.map { case (k, v) => k -> (v + mp1.getOrElse(k, 0L)) }
}
override def evaluate(buffer: Row): Any = {
lazy val st = buffer.getMap[Any, Long](0).toStream
val mode = st.foldLeft(st.head){case (e, s) => if (s._2 > e._2) s else e}
mode._1
}
}
之后您可以通过以下方式将它与您的数据框一起使用。
val modeColumnList = List("some", "column", "names") // or df.columns.toList
val modeAgg = new ModeUDAF()
val aggCols = modeColumnList.map(c => modeAgg(df(c)))
val aggregatedModeDF = df.agg(aggCols.head, aggCols.tail: _*)
aggregatedModeDF.show()
您也可以在最终数据帧上使用 .collect 以将结果收集到 scala 数据结构中。
注意:此解决方案的性能取决于输入列的基数。