有条件地将列和值添加到 Spark 行
Conditionally add column and value to Spark Rows
我正在处理一个 Spark DataFrame
(DF) 并且需要在流中添加一个列,从对 mapPartitions
的调用中:
// Don't worry about what 'widget' is or represents
val rdd = df.mapPartitions { rows => addColIfNecessary(rows, widget) }
然后:
def addColIfNecessary(rows : Iterator[Row], widget : Widget) : Iterator[Row] = {
rows.foreach { row =>
if(widget.determineWhetherRowNeedsNewCol(row)) {
// TODO: Add a new "fizz" column (of StringType) to the row
val newVal : String = widget.getValueOfNewCol(row)
row.addColumn("fizz", StringType, newVal)
}
}
rows
}
这显然只是伪代码,但传达了我正在尝试做的事情。关于我如何实际实施它有什么想法吗?
DataFrame 是面向列的结构,这意味着在某些行中添加一列并不是一个好主意。相反,您可以利用 DataFrames 中对可空值的支持,而不是添加额外的列,而是根据某些条件向行添加可选值。
一个例子:
让我们获取用户和页面的 DF:
val users = Seq("Alice" , "Bob", "Charly", "Dean", "Eve", "Flor", "Greta")
val pages = (1 to 9).map(i => s"page_$i")
val userPages = for {u <- users
p <- pages} yield (u,p)
val userPagesDF = sparkContext.parallelize(userPages).toDF("user","page")
// a user defined function that takes the last digit from the page and uses it to calculate a "rank". It only ranks pages with a number higher than 7
val rankUDF = udf((p:String) => if (p.takeRight(1).toInt>7) "top" else null)
// New DF with the extra column "rank", which contains values for only some rows
val ranked = userPagesDF.withColumn("rank", topPage($"page"))
ranked.show
+-----+-------+----+
| user| page|rank|
+-----+-------+----+
|Alice| page_1|null|
|Alice| page_2|null|
|Alice| page_3|null|
|Alice| page_4|null|
|Alice| page_5|null|
|Alice| page_6|null|
|Alice| page_7|null|
|Alice| page_8| top|
|Alice| page_9| top|
| Bob| page_1|null|
| Bob| page_2|null|
| Bob| page_3|null|
| Bob| page_4|null|
| Bob| page_5|null|
| Bob| page_6|null|
| Bob| page_7|null|
| Bob| page_8| top|
| Bob| page_9| top|
+-----+-------+----+
ranked.printSchema
root
|-- user: string (nullable = true)
|-- page: string (nullable = true)
|-- rank: string (nullable = true)
我正在处理一个 Spark DataFrame
(DF) 并且需要在流中添加一个列,从对 mapPartitions
的调用中:
// Don't worry about what 'widget' is or represents
val rdd = df.mapPartitions { rows => addColIfNecessary(rows, widget) }
然后:
def addColIfNecessary(rows : Iterator[Row], widget : Widget) : Iterator[Row] = {
rows.foreach { row =>
if(widget.determineWhetherRowNeedsNewCol(row)) {
// TODO: Add a new "fizz" column (of StringType) to the row
val newVal : String = widget.getValueOfNewCol(row)
row.addColumn("fizz", StringType, newVal)
}
}
rows
}
这显然只是伪代码,但传达了我正在尝试做的事情。关于我如何实际实施它有什么想法吗?
DataFrame 是面向列的结构,这意味着在某些行中添加一列并不是一个好主意。相反,您可以利用 DataFrames 中对可空值的支持,而不是添加额外的列,而是根据某些条件向行添加可选值。
一个例子: 让我们获取用户和页面的 DF:
val users = Seq("Alice" , "Bob", "Charly", "Dean", "Eve", "Flor", "Greta")
val pages = (1 to 9).map(i => s"page_$i")
val userPages = for {u <- users
p <- pages} yield (u,p)
val userPagesDF = sparkContext.parallelize(userPages).toDF("user","page")
// a user defined function that takes the last digit from the page and uses it to calculate a "rank". It only ranks pages with a number higher than 7
val rankUDF = udf((p:String) => if (p.takeRight(1).toInt>7) "top" else null)
// New DF with the extra column "rank", which contains values for only some rows
val ranked = userPagesDF.withColumn("rank", topPage($"page"))
ranked.show
+-----+-------+----+
| user| page|rank|
+-----+-------+----+
|Alice| page_1|null|
|Alice| page_2|null|
|Alice| page_3|null|
|Alice| page_4|null|
|Alice| page_5|null|
|Alice| page_6|null|
|Alice| page_7|null|
|Alice| page_8| top|
|Alice| page_9| top|
| Bob| page_1|null|
| Bob| page_2|null|
| Bob| page_3|null|
| Bob| page_4|null|
| Bob| page_5|null|
| Bob| page_6|null|
| Bob| page_7|null|
| Bob| page_8| top|
| Bob| page_9| top|
+-----+-------+----+
ranked.printSchema
root
|-- user: string (nullable = true)
|-- page: string (nullable = true)
|-- rank: string (nullable = true)