因式分解 Spark 列
Factorize Spark column
是否可以分解 Spark 数据框列?对于分解,我的意思是创建列中每个唯一值到相同 ID 的映射。
示例,原始数据帧:
+----------+----------------+--------------------+
| col1| col2| col3|
+----------+----------------+--------------------+
|1473490929|4060600988513370| A|
|1473492972|4060600988513370| A|
|1473509764|4060600988513370| B|
|1473513432|4060600988513370| C|
|1473513432|4060600988513370| A|
+----------+----------------+--------------------+
到分解版本:
+----------+----------------+--------------------+
| col1| col2| col3|
+----------+----------------+--------------------+
|1473490929|4060600988513370| 0|
|1473492972|4060600988513370| 0|
|1473509764|4060600988513370| 1|
|1473513432|4060600988513370| 2|
|1473513432|4060600988513370| 0|
+----------+----------------+--------------------+
在 scala 本身中它会相当简单,但由于 Spark 将它的数据帧分布在节点上,我不确定如何保持来自 A->0, B->1, C->2
的映射。
此外,假设数据帧非常大(千兆字节),这意味着可能无法将一整列加载到一台机器的内存中。
能做到吗?
您可以使用 StringIndexer
将字母编码为索引:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("col3")
.setOutputCol("col3Index")
val indexed = indexer.fit(df).transform(df)
indexed.show()
+----------+----------------+----+---------+
| col1| col2|col3|col3Index|
+----------+----------------+----+---------+
|1473490929|4060600988513370| A| 0.0|
|1473492972|4060600988513370| A| 0.0|
|1473509764|4060600988513370| B| 1.0|
|1473513432|4060600988513370| C| 2.0|
|1473513432|4060600988513370| A| 0.0|
+----------+----------------+----+---------+
数据:
val df = spark.createDataFrame(Seq(
(1473490929, "4060600988513370", "A"),
(1473492972, "4060600988513370", "A"),
(1473509764, "4060600988513370", "B"),
(1473513432, "4060600988513370", "C"),
(1473513432, "4060600988513370", "A"))).toDF("col1", "col2", "col3")
您可以使用用户定义的函数。
首先创建您需要的映射:
val updateFunction = udf {(x: String) =>
x match {
case "A" => 0
case "B" => 1
case "C" => 2
case _ => 3
}
}
现在您只需将它应用到您的 DataFrame
:
df.withColumn("col3", updateFunction(df.col("col3")))
是否可以分解 Spark 数据框列?对于分解,我的意思是创建列中每个唯一值到相同 ID 的映射。
示例,原始数据帧:
+----------+----------------+--------------------+
| col1| col2| col3|
+----------+----------------+--------------------+
|1473490929|4060600988513370| A|
|1473492972|4060600988513370| A|
|1473509764|4060600988513370| B|
|1473513432|4060600988513370| C|
|1473513432|4060600988513370| A|
+----------+----------------+--------------------+
到分解版本:
+----------+----------------+--------------------+
| col1| col2| col3|
+----------+----------------+--------------------+
|1473490929|4060600988513370| 0|
|1473492972|4060600988513370| 0|
|1473509764|4060600988513370| 1|
|1473513432|4060600988513370| 2|
|1473513432|4060600988513370| 0|
+----------+----------------+--------------------+
在 scala 本身中它会相当简单,但由于 Spark 将它的数据帧分布在节点上,我不确定如何保持来自 A->0, B->1, C->2
的映射。
此外,假设数据帧非常大(千兆字节),这意味着可能无法将一整列加载到一台机器的内存中。
能做到吗?
您可以使用 StringIndexer
将字母编码为索引:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("col3")
.setOutputCol("col3Index")
val indexed = indexer.fit(df).transform(df)
indexed.show()
+----------+----------------+----+---------+
| col1| col2|col3|col3Index|
+----------+----------------+----+---------+
|1473490929|4060600988513370| A| 0.0|
|1473492972|4060600988513370| A| 0.0|
|1473509764|4060600988513370| B| 1.0|
|1473513432|4060600988513370| C| 2.0|
|1473513432|4060600988513370| A| 0.0|
+----------+----------------+----+---------+
数据:
val df = spark.createDataFrame(Seq(
(1473490929, "4060600988513370", "A"),
(1473492972, "4060600988513370", "A"),
(1473509764, "4060600988513370", "B"),
(1473513432, "4060600988513370", "C"),
(1473513432, "4060600988513370", "A"))).toDF("col1", "col2", "col3")
您可以使用用户定义的函数。
首先创建您需要的映射:
val updateFunction = udf {(x: String) =>
x match {
case "A" => 0
case "B" => 1
case "C" => 2
case _ => 3
}
}
现在您只需将它应用到您的 DataFrame
:
df.withColumn("col3", updateFunction(df.col("col3")))