使用 apache spark 和 scala 进行数据预处理

Data preprocessing with apache spark and scala

我对 spark 和 scala 还很陌生,因此我对使用 spark 进行数据预处理和使用 rdds 有一些疑问。 我正在做一个小项目,我想用 spark 实现一个机器学习系统。我认为使用算法没问题,但我在预处理数据方面遇到了问题。 我有一个包含 30 列和大约一百万行的数据集。但为简单起见,假设我有以下数据集 (csv-file):

columnA, columnB, column_txt, label
1      , a      , abc       , 0
2      ,        , abc       , 0
3      , b      , abc       , 1
4      , b      , abc       , 1
5      , a      , abc       , 0
6      ,        , abc       , 0
7      , c      , abc       , 1
8      , a      , abc       , 1
9      , b      , abc       , 1
10     , c      , abc       , 0

在 spark 中加载数据后,我想执行以下步骤:

  1. 删除所有以“_txt”结尾的列
  2. 过滤掉 columnB 为空的所有行(这个我已经知道了)
  3. 删除那些超过 9 级的列(这里是 columnA)

所以我对问题 1 和问题 3 有疑问。 我知道我不能删除列所以我必须创建一个新的 rdd 但如果没有某些列我该怎么做呢? 现在我在 spark 中加载没有 header 的 csv 文件,但为了我的任务,我需要这样做。是否建议将 header 加载到单独的 rdd 中?但是我怎样才能与那个 rdd 交互以找到正确的列呢? 抱歉,我知道很多问题,但我仍处于起步阶段,正在努力学习。 谢谢并致以最诚挚的问候, 克里斯

假设数据框加载了 headers 并且结构是扁平的:

val df = sqlContext.
    read.
    format("com.databricks.spark.csv").
    option("header", "true").
    load("data.csv")

像这样的东西应该可以工作:

import org.apache.spark.sql.DataFrame

def moreThan9(df: DataFrame, col: String) = {
    df.agg(countDistinct(col)).first()(0) match {
        case x: Long => x > 9L
        case _ => false
    }
}

val newDf = df.
    schema. //  Extract schema
    toArray. // Convert to array
    map(_.name). // Map to names
    foldLeft(df)((df: DataFrame, col: String) => {
        if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
    })

如果它在没有 header 的情况下加载,那么您可以使用从自动分配的映射到实际的映射来做同样的事情。