来自字符串数组的 Spark 2.2 Scala DataFrame select,捕获错误

Spark 2.2 Scala DataFrame select from string array, catching errors

我是 SparkSQL/Scala 的新手,我正在努力完成一些看似简单的任务。

我正在尝试从 Scala 字符串数组构建一些动态 SQL。我正在尝试重新键入我的 DataFrame 中的一些列,但我不知道我需要重新键入哪些列,直到运行时我可以看到 DataFrame 中的列集。所以我正在尝试这样做:

val cols = df.columns
val typedCols = cols.map( c => getTypedColumn(c) )
df.select( ...)  or df.selectExpr(...) // how to invoke this with vals from my string array??

typedCols 最终将成为一个字符串数组,其值如下:

["a", "cast(b as int) b", "c"]

我是否需要先从该数组创建一个逗号分隔的大字符串?

因此,假设这可行,我会调用 select 语句,它将我的 DataFrame 转换为具有我所需类型的新 DataFrame。但是 DataFrame 中的某些记录会出现错误,并且会导致重新输入失败。

我如何获得包含所有通过输入的好记录的 DataFrame 结果,然后将所有坏记录扔到某种错误桶中?在尝试 DataFrame select 之前,我是否需要先进行验证?

您可以只使用可变参数:

val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c")
val typedCols = Array("a", "cast(b as int) b", "c")
df.selectExpr(typedCols: _*).show

+---+----+---+
|  a|   b|  c|
+---+----+---+
|  a|   1|  c|
|foo|null|baz|
+---+----+---+

但我个人更喜欢专栏:

val typedCols = Array($"a", $"b" cast "int", $"c")
df.select(typedCols: _*).show

How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket?

cast 失败的数据是 NULL。要查找好的记录,请使用 na.drop:

val result = df.selectExpr(typedCols: _*)
val good = result.na.drop()

查找空头支票是否有NULL

import org.apache.spark.sql.functions.col

val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))

获取不匹配的数据:

  • 如果 typedColsSeq[Column] 你可以

    df.where(typedCols.map(_.isNull).reduce(_ || _))  
    
  • 如果 typedColsSeq[String] 你可以:

    import org.apache.spark.sql.functions.expr
    
    df.where(typedCols.map(expr(_).isNull).reduce(_ || _))