为什么 spark 数据帧上的 "withColumn" 转换不检查外部列表中的记录?
Why "withColumn" transformation on spark dataframe is not checking records from an external list?
我正在使用 Spark 和 Scala 进行学习。我遇到了一种情况,我需要比较 spark 数据框的一列中存在的记录的有效性。
这就是我创建一个数据框的方式,"dataframe1":
import sparkSession.implicits._
val dataframe1 = Seq("AB","BC","CD","DA","AB","BC").toDF("col1")
数据框 1:
+----+
|col1|
+----+
| AB|
| BC|
| CD|
| DA|
| AB|
| BC|
+----+
记录的有效性取决于记录是"AB"还是"BC"的条件。这是我的第一次尝试:
val dataframe2 = dataframe1.withColumn("col2", when('col1.contains("AB") or 'col1.contains("BC"), "valid").otherwise("invalid"))
dataframe2:
+----+-------+
|col1| col2|
+----+-------+
| AB| valid|
| BC| valid|
| CD|invalid|
| DA|invalid|
| AB| valid|
| BC| valid|
+----+-------+
但我认为这不是一个好方法,因为如果我需要添加更多有效记录,那么我需要在 "when" 子句中添加条件,这会增加代码长度并扰乱代码可读性。
所以我尝试将所有有效记录放在一个列表中,并检查列表中是否存在记录字符串。如果它存在那么它是一个有效的记录否则不是。这是此试验的代码片段:
val validRecList = Seq("AB", "BC").toList
val dataframe3 = dataframe1.withColumn("col2", if(validRecList.contains('col1.toString())) lit("valid") else lit("invalid"))
但不知何故它没有按预期工作,结果是:
+----+-------+
|col1| col2|
+----+-------+
| AB|invalid|
| BC|invalid|
| CD|invalid|
| DA|invalid|
| AB|invalid|
| BC|invalid|
+----+-------+
谁能告诉我我在这里犯了什么错误?并且,对于这种情况的任何其他一般建议。
谢谢。
试试这个:
import spark.implicits._
import org.apache.spark.sql.functions._
val dataframe1 = Seq("AB","BC","CD","DA","AB","BC", "XX").toDF("col1").as[(String)]
val validRecList = List("AB", "BC")
val dataframe2 = dataframe1.withColumn("col2", when($"col1".isin(validRecList: _*), lit("valid")).otherwise (lit("invalid")))
dataframe2.show(false)
returns:
+----+-------+
|col1|col2 |
+----+-------+
|AB |valid |
|BC |valid |
|CD |invalid|
|DA |invalid|
|AB |valid |
|BC |valid |
|XX |invalid|
+----+-------+
dataframe3
代码不起作用,因为当我们看到有关数据集 https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset
上的 "withColumn" 函数的文档时
我们会看到 withColumn 接收 "String" 和 "Column" 作为参数类型。
所以这个代码
val dataframe3 = dataframe1.withColumn("col2", if(validRecList.contains('col1.toString())) lit("valid") else lit("invalid"))
会将 col2 作为新的列名,但会将 lit("valid")
或 lit("invalid")
作为列名。 if(validRecList.contains('col1.toString) lit("valid") else lit("invalid")
将作为 scala 代码执行,而不是作为 Dataset 操作或 Column 操作执行。
我的意思是这个 if(validRecList.contains('col1.toString)
是由 scala 而不是 spark 执行的,因为 "invalid" 结果是从 validRecList 派生的,列表中没有 'col1。但是当你定义 val validRecList = Seq('col1, "AB", "BC")
时,validRecList.contains('col1)
将 return true
不支持 IF
运算符
如果你想在 withColumn 函数上使用条件,你需要像这样表达 Column 类型的表达式:
dataframe3.withColumn("isContainRecList", $"col1".isin(validRecList: _*))
这个 $"col1".isin(validRecList: _*)
是一个 Column 类型的表达式,因为它将 return Column (基于文档)或者你可以使用 when(the_condition, value_if_true, value_if_false)
.
所以,我认为了解 Spark 引擎将处理我们的数据的类型很重要,如果我们没有给出 Column 类型表达式,它不会引用 'col1 数据,但它会引用'col1 作为 scala symbol.
此外,当您想使用 IF
时,也许您可以创建一个用户定义函数。
import org.apache.spark.sql.functions.udf
def checkValidRecList(needle: String): String = if(validRecList.contains(needle)) "valid" else "invalid"
val checkUdf = udf[String, String](checkValidRecList)
val dataframe3 = dataframe1.withColumn("col2", checkUdf('col1))
结果是:
scala> dataframe3.show(假)
+----+-------+
|col1|col2 |
+----+-------+
|AB |valid |
|BC |valid |
|CD |invalid|
|DA |invalid|
|AB |valid |
|BC |valid |
+----+-------+
但是,我认为我们应该使用记住这个 UDF 东西并不总是被推荐。
我正在使用 Spark 和 Scala 进行学习。我遇到了一种情况,我需要比较 spark 数据框的一列中存在的记录的有效性。 这就是我创建一个数据框的方式,"dataframe1":
import sparkSession.implicits._
val dataframe1 = Seq("AB","BC","CD","DA","AB","BC").toDF("col1")
数据框 1:
+----+
|col1|
+----+
| AB|
| BC|
| CD|
| DA|
| AB|
| BC|
+----+
记录的有效性取决于记录是"AB"还是"BC"的条件。这是我的第一次尝试:
val dataframe2 = dataframe1.withColumn("col2", when('col1.contains("AB") or 'col1.contains("BC"), "valid").otherwise("invalid"))
dataframe2:
+----+-------+
|col1| col2|
+----+-------+
| AB| valid|
| BC| valid|
| CD|invalid|
| DA|invalid|
| AB| valid|
| BC| valid|
+----+-------+
但我认为这不是一个好方法,因为如果我需要添加更多有效记录,那么我需要在 "when" 子句中添加条件,这会增加代码长度并扰乱代码可读性。
所以我尝试将所有有效记录放在一个列表中,并检查列表中是否存在记录字符串。如果它存在那么它是一个有效的记录否则不是。这是此试验的代码片段:
val validRecList = Seq("AB", "BC").toList
val dataframe3 = dataframe1.withColumn("col2", if(validRecList.contains('col1.toString())) lit("valid") else lit("invalid"))
但不知何故它没有按预期工作,结果是:
+----+-------+
|col1| col2|
+----+-------+
| AB|invalid|
| BC|invalid|
| CD|invalid|
| DA|invalid|
| AB|invalid|
| BC|invalid|
+----+-------+
谁能告诉我我在这里犯了什么错误?并且,对于这种情况的任何其他一般建议。 谢谢。
试试这个:
import spark.implicits._
import org.apache.spark.sql.functions._
val dataframe1 = Seq("AB","BC","CD","DA","AB","BC", "XX").toDF("col1").as[(String)]
val validRecList = List("AB", "BC")
val dataframe2 = dataframe1.withColumn("col2", when($"col1".isin(validRecList: _*), lit("valid")).otherwise (lit("invalid")))
dataframe2.show(false)
returns:
+----+-------+
|col1|col2 |
+----+-------+
|AB |valid |
|BC |valid |
|CD |invalid|
|DA |invalid|
|AB |valid |
|BC |valid |
|XX |invalid|
+----+-------+
dataframe3
代码不起作用,因为当我们看到有关数据集 https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset
我们会看到 withColumn 接收 "String" 和 "Column" 作为参数类型。
所以这个代码
val dataframe3 = dataframe1.withColumn("col2", if(validRecList.contains('col1.toString())) lit("valid") else lit("invalid"))
会将 col2 作为新的列名,但会将 lit("valid")
或 lit("invalid")
作为列名。 if(validRecList.contains('col1.toString) lit("valid") else lit("invalid")
将作为 scala 代码执行,而不是作为 Dataset 操作或 Column 操作执行。
我的意思是这个 if(validRecList.contains('col1.toString)
是由 scala 而不是 spark 执行的,因为 "invalid" 结果是从 validRecList 派生的,列表中没有 'col1。但是当你定义 val validRecList = Seq('col1, "AB", "BC")
时,validRecList.contains('col1)
将 return true
IF
运算符
如果你想在 withColumn 函数上使用条件,你需要像这样表达 Column 类型的表达式:
dataframe3.withColumn("isContainRecList", $"col1".isin(validRecList: _*))
这个 $"col1".isin(validRecList: _*)
是一个 Column 类型的表达式,因为它将 return Column (基于文档)或者你可以使用 when(the_condition, value_if_true, value_if_false)
.
所以,我认为了解 Spark 引擎将处理我们的数据的类型很重要,如果我们没有给出 Column 类型表达式,它不会引用 'col1 数据,但它会引用'col1 作为 scala symbol.
此外,当您想使用 IF
时,也许您可以创建一个用户定义函数。
import org.apache.spark.sql.functions.udf
def checkValidRecList(needle: String): String = if(validRecList.contains(needle)) "valid" else "invalid"
val checkUdf = udf[String, String](checkValidRecList)
val dataframe3 = dataframe1.withColumn("col2", checkUdf('col1))
结果是:
scala> dataframe3.show(假)
+----+-------+
|col1|col2 |
+----+-------+
|AB |valid |
|BC |valid |
|CD |invalid|
|DA |invalid|
|AB |valid |
|BC |valid |
+----+-------+
但是,我认为我们应该使用记住这个 UDF 东西并不总是被推荐。