使用函数返回一个新的数据框(通过转换现有的数据框)- spark/scala
Returning a new Dataframe (by transforming an existing one) using a function - spark/scala
我是 Spark 的新手。我正在尝试将 JSONArray
读入 Dataframe
并对其执行一些转换。我试图通过删除一些 html
标签和一些 newline
字符来清理我的数据。例如:
从JSON读取的初始数据帧:
+-----+---+-----+-------------------------------+
|index| X|label| date |
+-----+---+-----+-------------------------------+
| 1| 1| A|<div>"2017-01-01"</div>|
| 2| 3| B|<div>2017-01-02</div> |
| 3| 5| A|<div>2017-01-03</div> |
| 4| 7| B|<div>2017-01-04</div> |
+-----+---+-----+-------------------------------+
应转换为:
+-----+---+-----+------------+
|index| X|label| date |
+-----+---+-----+------------+
| 1| 1| A|'2017-01-01'|
| 2| 3| B|2017-01-02 |
| 3| 5| A|2017-01-03 |
| 4| 7| B|2017-01-04 |
+-----+---+-----+------------+
我知道我们可以使用以下方法执行这些转换:
df.withColumn("col_name",regexp_replace("col_name",pattern,replacement))
我可以使用 withColumn
清理我的数据,如上所示。但是,我有大量的列并且为每一列编写一个 .withColumn
方法似乎并不优雅、简洁或高效。所以我尝试做这样的事情:
val finalDF = htmlCleanse(intialDF, columnsArray)
def htmlCleanse(df: DataFrame, columns: Array[String]): DataFrame = {
var retDF = hiveContext.emptyDataFrame
for(i <- 0 to columns.size-1){
val name = columns(i)
retDF = df.withColumn(name,regexp_replace(col(name),"<(?:\"[^\"]*\"['\"]*|'[^']*'['\"]*|[^'\">])+>",""))
.withColumn(name,regexp_replace(col(name),""","'"))
.withColumn(name,regexp_replace(col(name)," "," "))
.withColumn(name,regexp_replace(col(name),":",":"))
}
retDF
}
我定义了一个新函数 htmlCleanse
并将要转换的 Dataframe 和列数组传递给该函数。该函数创建一个新的 emptyDataFrame
并遍历列列表,对单个迭代的列执行清理,并将转换后的 df
分配给 retDF
变量。
这没有给我任何错误,但它似乎并没有从所有列中删除 html 标记,而某些列似乎已被清除。不确定这种不一致行为的原因是什么(对此有什么想法吗?)。
那么,清理数据的有效方法是什么?任何帮助,将不胜感激。谢谢!
第一个问题是初始化一个空框架什么都不做,你只是创建一些新的东西。然后你不能 "add" 在没有连接的情况下从另一个数据帧向它发送东西(这在性能方面是个坏主意)。
第二个问题是retDF总是从df定义的。这意味着除了清理最后一列之外,您将丢弃所做的所有操作。
相反,您应该将 retDF 初始化为 df,并在每次迭代中修复一个列并覆盖 retDF,如下所示:
def htmlCleanse(df: DataFrame, columns: Array[String]): DataFrame = {
var retDF = df
for(i <- 0 to columns.size-1){
val name = columns(i)
retDF = retDF.withColumn(name,regexp_replace(col(name),"<(?:\"[^\"]*\"['\"]*|'[^']*'['\"]*|[^'\">])+>",""))
.withColumn(name,regexp_replace(col(name),""","'"))
.withColumn(name,regexp_replace(col(name)," "," "))
.withColumn(name,regexp_replace(col(name),":",":"))
}
retDF
}
我是 Spark 的新手。我正在尝试将 JSONArray
读入 Dataframe
并对其执行一些转换。我试图通过删除一些 html
标签和一些 newline
字符来清理我的数据。例如:
从JSON读取的初始数据帧:
+-----+---+-----+-------------------------------+
|index| X|label| date |
+-----+---+-----+-------------------------------+
| 1| 1| A|<div>"2017-01-01"</div>|
| 2| 3| B|<div>2017-01-02</div> |
| 3| 5| A|<div>2017-01-03</div> |
| 4| 7| B|<div>2017-01-04</div> |
+-----+---+-----+-------------------------------+
应转换为:
+-----+---+-----+------------+
|index| X|label| date |
+-----+---+-----+------------+
| 1| 1| A|'2017-01-01'|
| 2| 3| B|2017-01-02 |
| 3| 5| A|2017-01-03 |
| 4| 7| B|2017-01-04 |
+-----+---+-----+------------+
我知道我们可以使用以下方法执行这些转换:
df.withColumn("col_name",regexp_replace("col_name",pattern,replacement))
我可以使用 withColumn
清理我的数据,如上所示。但是,我有大量的列并且为每一列编写一个 .withColumn
方法似乎并不优雅、简洁或高效。所以我尝试做这样的事情:
val finalDF = htmlCleanse(intialDF, columnsArray)
def htmlCleanse(df: DataFrame, columns: Array[String]): DataFrame = {
var retDF = hiveContext.emptyDataFrame
for(i <- 0 to columns.size-1){
val name = columns(i)
retDF = df.withColumn(name,regexp_replace(col(name),"<(?:\"[^\"]*\"['\"]*|'[^']*'['\"]*|[^'\">])+>",""))
.withColumn(name,regexp_replace(col(name),""","'"))
.withColumn(name,regexp_replace(col(name)," "," "))
.withColumn(name,regexp_replace(col(name),":",":"))
}
retDF
}
我定义了一个新函数 htmlCleanse
并将要转换的 Dataframe 和列数组传递给该函数。该函数创建一个新的 emptyDataFrame
并遍历列列表,对单个迭代的列执行清理,并将转换后的 df
分配给 retDF
变量。
这没有给我任何错误,但它似乎并没有从所有列中删除 html 标记,而某些列似乎已被清除。不确定这种不一致行为的原因是什么(对此有什么想法吗?)。
那么,清理数据的有效方法是什么?任何帮助,将不胜感激。谢谢!
第一个问题是初始化一个空框架什么都不做,你只是创建一些新的东西。然后你不能 "add" 在没有连接的情况下从另一个数据帧向它发送东西(这在性能方面是个坏主意)。
第二个问题是retDF总是从df定义的。这意味着除了清理最后一列之外,您将丢弃所做的所有操作。
相反,您应该将 retDF 初始化为 df,并在每次迭代中修复一个列并覆盖 retDF,如下所示:
def htmlCleanse(df: DataFrame, columns: Array[String]): DataFrame = {
var retDF = df
for(i <- 0 to columns.size-1){
val name = columns(i)
retDF = retDF.withColumn(name,regexp_replace(col(name),"<(?:\"[^\"]*\"['\"]*|'[^']*'['\"]*|[^'\">])+>",""))
.withColumn(name,regexp_replace(col(name),""","'"))
.withColumn(name,regexp_replace(col(name)," "," "))
.withColumn(name,regexp_replace(col(name),":",":"))
}
retDF
}