使用 apache spark 和 scala 进行数据预处理
Data preprocessing with apache spark and scala
我对 spark 和 scala 还很陌生,因此我对使用 spark 进行数据预处理和使用 rdds 有一些疑问。
我正在做一个小项目,我想用 spark 实现一个机器学习系统。我认为使用算法没问题,但我在预处理数据方面遇到了问题。
我有一个包含 30 列和大约一百万行的数据集。但为简单起见,假设我有以下数据集 (csv-file):
columnA, columnB, column_txt, label
1 , a , abc , 0
2 , , abc , 0
3 , b , abc , 1
4 , b , abc , 1
5 , a , abc , 0
6 , , abc , 0
7 , c , abc , 1
8 , a , abc , 1
9 , b , abc , 1
10 , c , abc , 0
在 spark 中加载数据后,我想执行以下步骤:
- 删除所有以“_txt”结尾的列
- 过滤掉 columnB 为空的所有行(这个我已经知道了)
- 删除那些超过 9 级的列(这里是 columnA)
所以我对问题 1 和问题 3 有疑问。
我知道我不能删除列所以我必须创建一个新的 rdd 但如果没有某些列我该怎么做呢?
现在我在 spark 中加载没有 header 的 csv 文件,但为了我的任务,我需要这样做。是否建议将 header 加载到单独的 rdd 中?但是我怎样才能与那个 rdd 交互以找到正确的列呢?
抱歉,我知道很多问题,但我仍处于起步阶段,正在努力学习。
谢谢并致以最诚挚的问候,
克里斯
假设数据框加载了 headers 并且结构是扁平的:
val df = sqlContext.
read.
format("com.databricks.spark.csv").
option("header", "true").
load("data.csv")
像这样的东西应该可以工作:
import org.apache.spark.sql.DataFrame
def moreThan9(df: DataFrame, col: String) = {
df.agg(countDistinct(col)).first()(0) match {
case x: Long => x > 9L
case _ => false
}
}
val newDf = df.
schema. // Extract schema
toArray. // Convert to array
map(_.name). // Map to names
foldLeft(df)((df: DataFrame, col: String) => {
if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
})
如果它在没有 header 的情况下加载,那么您可以使用从自动分配的映射到实际的映射来做同样的事情。
我对 spark 和 scala 还很陌生,因此我对使用 spark 进行数据预处理和使用 rdds 有一些疑问。 我正在做一个小项目,我想用 spark 实现一个机器学习系统。我认为使用算法没问题,但我在预处理数据方面遇到了问题。 我有一个包含 30 列和大约一百万行的数据集。但为简单起见,假设我有以下数据集 (csv-file):
columnA, columnB, column_txt, label
1 , a , abc , 0
2 , , abc , 0
3 , b , abc , 1
4 , b , abc , 1
5 , a , abc , 0
6 , , abc , 0
7 , c , abc , 1
8 , a , abc , 1
9 , b , abc , 1
10 , c , abc , 0
在 spark 中加载数据后,我想执行以下步骤:
- 删除所有以“_txt”结尾的列
- 过滤掉 columnB 为空的所有行(这个我已经知道了)
- 删除那些超过 9 级的列(这里是 columnA)
所以我对问题 1 和问题 3 有疑问。 我知道我不能删除列所以我必须创建一个新的 rdd 但如果没有某些列我该怎么做呢? 现在我在 spark 中加载没有 header 的 csv 文件,但为了我的任务,我需要这样做。是否建议将 header 加载到单独的 rdd 中?但是我怎样才能与那个 rdd 交互以找到正确的列呢? 抱歉,我知道很多问题,但我仍处于起步阶段,正在努力学习。 谢谢并致以最诚挚的问候, 克里斯
假设数据框加载了 headers 并且结构是扁平的:
val df = sqlContext.
read.
format("com.databricks.spark.csv").
option("header", "true").
load("data.csv")
像这样的东西应该可以工作:
import org.apache.spark.sql.DataFrame
def moreThan9(df: DataFrame, col: String) = {
df.agg(countDistinct(col)).first()(0) match {
case x: Long => x > 9L
case _ => false
}
}
val newDf = df.
schema. // Extract schema
toArray. // Convert to array
map(_.name). // Map to names
foldLeft(df)((df: DataFrame, col: String) => {
if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
})
如果它在没有 header 的情况下加载,那么您可以使用从自动分配的映射到实际的映射来做同样的事情。