为什么 spark 数据帧上的 "withColumn" 转换不检查外部列表中的记录?

Why "withColumn" transformation on spark dataframe is not checking records from an external list?

我正在使用 Spark 和 Scala 进行学习。我遇到了一种情况,我需要比较 spark 数据框的一列中存在的记录的有效性。 这就是我创建一个数据框的方式,"dataframe1":

import sparkSession.implicits._
val dataframe1 = Seq("AB","BC","CD","DA","AB","BC").toDF("col1")

数据框 1:

+----+
|col1|
+----+
|  AB|
|  BC|
|  CD|
|  DA|
|  AB|
|  BC|
+----+

记录的有效性取决于记录是"AB"还是"BC"的条件。这是我的第一次尝试:

val dataframe2 = dataframe1.withColumn("col2", when('col1.contains("AB") or 'col1.contains("BC"), "valid").otherwise("invalid"))

dataframe2:

+----+-------+
|col1|   col2|
+----+-------+
|  AB|  valid|
|  BC|  valid|
|  CD|invalid|
|  DA|invalid|
|  AB|  valid|
|  BC|  valid|
+----+-------+

但我认为这不是一个好方法,因为如果我需要添加更多有效记录,那么我需要在 "when" 子句中添加条件,这会增加代码长度并扰乱代码可读性。

所以我尝试将所有有效记录放在一个列表中,并检查列表中是否存在记录字符串。如果它存在那么它是一个有效的记录否则不是。这是此试验的代码片段:

val validRecList = Seq("AB", "BC").toList
val dataframe3 = dataframe1.withColumn("col2", if(validRecList.contains('col1.toString())) lit("valid") else lit("invalid"))

但不知何故它没有按预期工作,结果是:

+----+-------+
|col1|   col2|
+----+-------+
|  AB|invalid|
|  BC|invalid|
|  CD|invalid|
|  DA|invalid|
|  AB|invalid|
|  BC|invalid|
+----+-------+

谁能告诉我我在这里犯了什么错误?并且,对于这种情况的任何其他一般建议。 谢谢。

试试这个:

import spark.implicits._
import org.apache.spark.sql.functions._

val dataframe1 = Seq("AB","BC","CD","DA","AB","BC", "XX").toDF("col1").as[(String)]
val validRecList = List("AB", "BC") 

val dataframe2 = dataframe1.withColumn("col2", when($"col1".isin(validRecList: _*), lit("valid")).otherwise (lit("invalid")))
dataframe2.show(false)

returns:

+----+-------+
|col1|col2   |
+----+-------+
|AB  |valid  |
|BC  |valid  |
|CD  |invalid|
|DA  |invalid|
|AB  |valid  |
|BC  |valid  |
|XX  |invalid|
+----+-------+

dataframe3 代码不起作用,因为当我们看到有关数据集 https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

上的 "withColumn" 函数的文档时

我们会看到 withColumn 接收 "String" 和 "Column" 作为参数类型。

所以这个代码

val dataframe3 = dataframe1.withColumn("col2", if(validRecList.contains('col1.toString())) lit("valid") else lit("invalid"))

会将 col2 作为新的列名,但会将 lit("valid")lit("invalid") 作为列名。 if(validRecList.contains('col1.toString) lit("valid") else lit("invalid") 将作为 scala 代码执行,而不是作为 Dataset 操作或 Column 操作执行。

我的意思是这个 if(validRecList.contains('col1.toString) 是由 scala 而不是 spark 执行的,因为 "invalid" 结果是从 validRecList 派生的,列表中没有 'col1。但是当你定义 val validRecList = Seq('col1, "AB", "BC") 时,validRecList.contains('col1) 将 return true

此外,Dataset and on Column

不支持 IF 运算符

如果你想在 withColumn 函数上使用条件,你需要像这样表达 Column 类型的表达式:

dataframe3.withColumn("isContainRecList", $"col1".isin(validRecList: _*))

这个 $"col1".isin(validRecList: _*) 是一个 Column 类型的表达式,因为它将 return Column (基于文档)或者你可以使用 when(the_condition, value_if_true, value_if_false).

所以,我认为了解 Spark 引擎将处理我们的数据的类型很重要,如果我们没有给出 Column 类型表达式,它不会引用 'col1 数据,但它会引用'col1 作为 scala symbol.

此外,当您想使用 IF 时,也许您可​​以创建一个用户定义函数。

import org.apache.spark.sql.functions.udf
def checkValidRecList(needle: String): String = if(validRecList.contains(needle)) "valid" else "invalid"

val checkUdf = udf[String, String](checkValidRecList)

val dataframe3 = dataframe1.withColumn("col2", checkUdf('col1))

结果是:

scala> dataframe3.show(假)

+----+-------+
|col1|col2   |
+----+-------+
|AB  |valid  |
|BC  |valid  |
|CD  |invalid|
|DA  |invalid|
|AB  |valid  |
|BC  |valid  |
+----+-------+

但是,我认为我们应该使用记住这个 UDF 东西并不总是被推荐。