默认情况下,Spark sql 模式中的可空性是建议性的。严格执行它的最佳方法是什么?
Nullability in Spark sql schemas is advisory by default. What is best way to strictly enforce it?
我正在做一个简单的 ETL 项目,它读取 CSV 文件,执行
对每一列进行一些修改,然后将结果写为 JSON。
我想要读取我的结果的下游流程
确信我的输出符合
一个商定的架构,但我的问题是即使我定义
我的所有字段的输入模式都为 nullable=false,空值可以偷偷摸摸
并损坏我的输出文件,而且似乎没有(性能)方式我可以
让 Spark 为我的输入字段强制执行 'not null'。
这似乎是一项功能,正如 Spark 权威指南中所述:
when you define a schema where all columns are declared to not have
null values , Spark will not enforce that and will happily let null
values into that column. The nullable signal is simply to help Spark
SQL optimize for handling that column. If you have null values in
columns that should not have null values, you can get an incorrect
result or see strange exceptions that can be hard to debug.
我写了一个小检查工具来遍历数据框的每一行并且
如果在任何列中检测到空值(在任何级别
嵌套,在字段或子字段的情况下,如映射、结构或数组。)
我特别想知道:我是否用这个检查实用程序重新发明了轮子?是否有任何现有的图书馆,或
可以为我做到这一点的 Spark 技术(最好是比我实现的更好的方式)?
检查实用程序和我的管道的简化版本如下所示。如前所述,调用
检查实用程序已被注释掉。如果你 运行 没有启用检查实用程序,你会看到这个结果
/tmp/output.csv.
cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5
表头后第二行应该是一个数字,但它是一个空字符串
(我猜这就是 spark 写出 null 的方式。)这个输出对于
读取我的 ETL 作业输出的下游组件:这些组件只需要整数。
现在,我可以通过取消注释行来启用检查
//checkNulls(inDf)
当我这样做时,我得到一个异常,通知我无效的空值并打印
排除整个违规行,如下所示:
java.lang.RuntimeException: found null column value in row: [null,4]
Spark/Definitive 指南
中给出的一种可能的替代方法
Spark,权威指南提到了这样做的可能性:
<dataframe>.na.drop()
但这会 (AFAIK) 默默地删除不良记录,而不是标记不良记录。
然后我可以在下降前后对输入做一个 "set subtract" ,但这看起来像
找出什么是 null 什么不是 null 会严重影响性能。乍一看,我会
更喜欢我的方法....但我仍然想知道是否有更好的方法。
完整代码如下。谢谢!
package org
import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {
import NullCheckMethods._
//val input = "2,3\n\"xxx\",4" // this will be dropped as malformed
val input = "2,3\n,4" // BUT.. this will be let through
new PrintWriter("/tmp/foo.csv") { write(input); close }
lazy val sparkConf = new SparkConf()
.setAppName("Learn Spark")
.setMaster("local[*]")
lazy val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val spark = sparkSession
val schema = new StructType(
Array(
StructField("one", IntegerType, nullable = false),
StructField("two", IntegerType, nullable = false)
)
)
val inDf: DataFrame =
spark.
read.
option("header", "false").
option("mode", "dropMalformed").
schema(schema).
csv("/tmp/foo.csv")
//checkNulls(inDf)
val plusOneDf = inDf.selectExpr("one+1", "two+1")
plusOneDf.show()
plusOneDf.
write.
option("header", "true").
csv("/tmp/output.csv")
}
object NullCheckMethods extends Serializable {
def checkNull(columnValue: Any): Unit = {
if (columnValue == null)
throw new RuntimeException("got null")
columnValue match {
case item: Seq[_] =>
item.foreach(checkNull)
case item: Map[_, _] =>
item.values.foreach(checkNull)
case item: Row =>
item.toSeq.foreach {
checkNull
}
case default =>
println(
s"bad object [ $default ] of type: ${default.getClass.getName}")
}
}
def checkNulls(row: Row): Unit = {
try {
row.toSeq.foreach {
checkNull
}
} catch {
case err: Throwable =>
throw new RuntimeException(
s"found null column value in row: ${row}")
}
}
def checkNulls(df: DataFrame): Unit = {
df.foreach { row => checkNulls(row) }
}
}
您可以使用 built-in 行方法 anyNull 拆分数据帧并以不同方式处理两个拆分:
val plusOneNoNulls = plusOneDf.filter(!_.anyNull)
val plusOneWithNulls = plusOneDf.filter(_.anyNull)
如果您不打算使用手动 null-handling 流程,使用内置 DataFrame.na 方法会更简单,因为它已经实现了自动处理空值的所有常用方法(即删除或填充他们用默认值)。
我正在做一个简单的 ETL 项目,它读取 CSV 文件,执行 对每一列进行一些修改,然后将结果写为 JSON。 我想要读取我的结果的下游流程 确信我的输出符合 一个商定的架构,但我的问题是即使我定义 我的所有字段的输入模式都为 nullable=false,空值可以偷偷摸摸 并损坏我的输出文件,而且似乎没有(性能)方式我可以 让 Spark 为我的输入字段强制执行 'not null'。
这似乎是一项功能,正如 Spark 权威指南中所述:
when you define a schema where all columns are declared to not have null values , Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be hard to debug.
我写了一个小检查工具来遍历数据框的每一行并且 如果在任何列中检测到空值(在任何级别 嵌套,在字段或子字段的情况下,如映射、结构或数组。)
我特别想知道:我是否用这个检查实用程序重新发明了轮子?是否有任何现有的图书馆,或 可以为我做到这一点的 Spark 技术(最好是比我实现的更好的方式)?
检查实用程序和我的管道的简化版本如下所示。如前所述,调用 检查实用程序已被注释掉。如果你 运行 没有启用检查实用程序,你会看到这个结果 /tmp/output.csv.
cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5
表头后第二行应该是一个数字,但它是一个空字符串 (我猜这就是 spark 写出 null 的方式。)这个输出对于 读取我的 ETL 作业输出的下游组件:这些组件只需要整数。
现在,我可以通过取消注释行来启用检查
//checkNulls(inDf)
当我这样做时,我得到一个异常,通知我无效的空值并打印 排除整个违规行,如下所示:
java.lang.RuntimeException: found null column value in row: [null,4]
Spark/Definitive 指南
中给出的一种可能的替代方法Spark,权威指南提到了这样做的可能性:
<dataframe>.na.drop()
但这会 (AFAIK) 默默地删除不良记录,而不是标记不良记录。 然后我可以在下降前后对输入做一个 "set subtract" ,但这看起来像 找出什么是 null 什么不是 null 会严重影响性能。乍一看,我会 更喜欢我的方法....但我仍然想知道是否有更好的方法。 完整代码如下。谢谢!
package org
import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {
import NullCheckMethods._
//val input = "2,3\n\"xxx\",4" // this will be dropped as malformed
val input = "2,3\n,4" // BUT.. this will be let through
new PrintWriter("/tmp/foo.csv") { write(input); close }
lazy val sparkConf = new SparkConf()
.setAppName("Learn Spark")
.setMaster("local[*]")
lazy val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val spark = sparkSession
val schema = new StructType(
Array(
StructField("one", IntegerType, nullable = false),
StructField("two", IntegerType, nullable = false)
)
)
val inDf: DataFrame =
spark.
read.
option("header", "false").
option("mode", "dropMalformed").
schema(schema).
csv("/tmp/foo.csv")
//checkNulls(inDf)
val plusOneDf = inDf.selectExpr("one+1", "two+1")
plusOneDf.show()
plusOneDf.
write.
option("header", "true").
csv("/tmp/output.csv")
}
object NullCheckMethods extends Serializable {
def checkNull(columnValue: Any): Unit = {
if (columnValue == null)
throw new RuntimeException("got null")
columnValue match {
case item: Seq[_] =>
item.foreach(checkNull)
case item: Map[_, _] =>
item.values.foreach(checkNull)
case item: Row =>
item.toSeq.foreach {
checkNull
}
case default =>
println(
s"bad object [ $default ] of type: ${default.getClass.getName}")
}
}
def checkNulls(row: Row): Unit = {
try {
row.toSeq.foreach {
checkNull
}
} catch {
case err: Throwable =>
throw new RuntimeException(
s"found null column value in row: ${row}")
}
}
def checkNulls(df: DataFrame): Unit = {
df.foreach { row => checkNulls(row) }
}
}
您可以使用 built-in 行方法 anyNull 拆分数据帧并以不同方式处理两个拆分:
val plusOneNoNulls = plusOneDf.filter(!_.anyNull)
val plusOneWithNulls = plusOneDf.filter(_.anyNull)
如果您不打算使用手动 null-handling 流程,使用内置 DataFrame.na 方法会更简单,因为它已经实现了自动处理空值的所有常用方法(即删除或填充他们用默认值)。