如何检测 Spark DataFrame 是否有列
How do I detect if a Spark DataFrame has a column
当我在 Spark SQL 中从 JSON 文件创建 DataFrame
时,如何在调用 .select
[=16= 之前判断给定列是否存在]
示例 JSON 架构:
{
"a": {
"b": 1,
"c": 2
}
}
这就是我想要做的:
potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))
但我找不到适合 hasColumn
的函数。我得到的最接近的是测试该列是否在这个有点尴尬的数组中:
scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
实际上你甚至不需要调用 select 来使用列,你可以直接在数据框本身上调用它
// define test data
case class Test(a: Int, b: Int)
val testList = List(Test(1,2), Test(3,4))
val testDF = sqlContext.createDataFrame(testList)
// define the hasColumn function
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)
// then you can just use it on the DF with a given column name
hasColumn(testDF, "a") // <-- true
hasColumn(testDF, "c") // <-- false
或者,您可以使用 pimp my library 模式定义一个隐式 class,以便 hasColumn 方法可以直接在您的数据帧上使用
implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
def hasColumn(colName: String) = df.columns.contains(colName)
}
然后您可以将其用作:
testDF.hasColumn("a") // <-- true
testDF.hasColumn("c") // <-- false
您的另一个选择是对 df.columns
和您的 potential_columns
.
进行一些数组操作(在本例中为 intersect
)
// Loading some data (so you can just copy & paste right into spark-shell)
case class Document( a: String, b: String, c: String)
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF
// The columns we want to extract
val potential_columns = Seq("b", "c", "d")
// Get the intersect of the potential columns and the actual columns,
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show
唉,这对上面的内部对象场景不起作用。您需要查看架构。
我要将您的 potential_columns
更改为完全限定的列名
val potential_columns = Seq("a.b", "a.c", "a.d")
// Our object model
case class Document( a: String, b: String, c: String)
case class Document2( a: Document, b: String, c: String)
// And some data...
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF
// We go through each of the fields in the schema.
// For StructTypes we return an array of parentName.fieldName
// For everything else we return an array containing just the field name
// We then flatten the complete list of field names
// Then we intersect that with our potential_columns leaving us just a list of column we want
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show
这只深入一层,因此要使其通用,您需要做更多的工作。
假设它存在并让它以 Try
失败。简单明了,支持任意嵌套:
import scala.util.Try
import org.apache.spark.sql.DataFrame
def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
val df = sqlContext.read.json(sc.parallelize(
"""{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))
hasColumn(df, "foobar")
// Boolean = false
hasColumn(df, "foo")
// Boolean = true
hasColumn(df, "foo.bar")
// Boolean = true
hasColumn(df, "foo.bar.foobar")
// Boolean = true
hasColumn(df, "foo.bar.foobaz")
// Boolean = false
或者更简单:
val columns = Seq(
"foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")
columns.flatMap(c => Try(df(c)).toOption)
// Seq[org.apache.spark.sql.Column] = List(
// foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)
Python相当于:
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row
def has_column(df, col):
try:
df[col]
return True
except AnalysisException:
return False
df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()
has_column(df, "foobar")
## False
has_column(df, "foo")
## True
has_column(df, "foo.bar")
## True
has_column(df, "foo.bar.foobar")
## True
has_column(df, "foo.bar.foobaz")
## False
Try
不是最优的,因为它会在做出决定之前评估 Try
中的表达式。
对于大型数据集,在Scala
中使用以下内容:
df.schema.fieldNames.contains("column_name")
我通常使用的另一个选项是
df.columns.contains("column-name-to-check")
这个returns一个布尔值
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
Try(df.select(colName)).isSuccess
使用上述函数检查包含嵌套列名的列是否存在。
对于那些在寻找 Python 解决方案时偶然发现的人,我使用:
if 'column_name_to_check' in df.columns:
# do something
当我使用 Python 尝试@Jai Prakash 对 df.columns.contains('column-name-to-check')
的回答时,我得到了 AttributeError: 'list' object has no attribute 'contains'
。
如果您在加载时使用模式定义分解 json,则无需检查该列。如果它不在 json 源中,它将显示为空列。
val schemaJson = """
{
"type": "struct",
"fields": [
{
"name": field1
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": field2
"type": "string",
"nullable": true,
"metadata": {}
}
]
}
"""
val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]
val djson = sqlContext.read
.schema(schema )
.option("badRecordsPath", readExceptionPath)
.json(dataPath)
在 PySpark 中,df.columns 为您提供数据框中的列列表,因此
"colName" 在 df.columns
return 是真还是假。试一试。祝你好运!
对于嵌套列,您可以使用
df.schema.simpleString().find('column_name')
在 pyspark 中你可以简单地 运行
'field' in df.columns
当我在 Spark SQL 中从 JSON 文件创建 DataFrame
时,如何在调用 .select
[=16= 之前判断给定列是否存在]
示例 JSON 架构:
{
"a": {
"b": 1,
"c": 2
}
}
这就是我想要做的:
potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))
但我找不到适合 hasColumn
的函数。我得到的最接近的是测试该列是否在这个有点尴尬的数组中:
scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
实际上你甚至不需要调用 select 来使用列,你可以直接在数据框本身上调用它
// define test data
case class Test(a: Int, b: Int)
val testList = List(Test(1,2), Test(3,4))
val testDF = sqlContext.createDataFrame(testList)
// define the hasColumn function
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)
// then you can just use it on the DF with a given column name
hasColumn(testDF, "a") // <-- true
hasColumn(testDF, "c") // <-- false
或者,您可以使用 pimp my library 模式定义一个隐式 class,以便 hasColumn 方法可以直接在您的数据帧上使用
implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
def hasColumn(colName: String) = df.columns.contains(colName)
}
然后您可以将其用作:
testDF.hasColumn("a") // <-- true
testDF.hasColumn("c") // <-- false
您的另一个选择是对 df.columns
和您的 potential_columns
.
intersect
)
// Loading some data (so you can just copy & paste right into spark-shell)
case class Document( a: String, b: String, c: String)
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF
// The columns we want to extract
val potential_columns = Seq("b", "c", "d")
// Get the intersect of the potential columns and the actual columns,
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show
唉,这对上面的内部对象场景不起作用。您需要查看架构。
我要将您的 potential_columns
更改为完全限定的列名
val potential_columns = Seq("a.b", "a.c", "a.d")
// Our object model
case class Document( a: String, b: String, c: String)
case class Document2( a: Document, b: String, c: String)
// And some data...
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF
// We go through each of the fields in the schema.
// For StructTypes we return an array of parentName.fieldName
// For everything else we return an array containing just the field name
// We then flatten the complete list of field names
// Then we intersect that with our potential_columns leaving us just a list of column we want
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show
这只深入一层,因此要使其通用,您需要做更多的工作。
假设它存在并让它以 Try
失败。简单明了,支持任意嵌套:
import scala.util.Try
import org.apache.spark.sql.DataFrame
def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
val df = sqlContext.read.json(sc.parallelize(
"""{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))
hasColumn(df, "foobar")
// Boolean = false
hasColumn(df, "foo")
// Boolean = true
hasColumn(df, "foo.bar")
// Boolean = true
hasColumn(df, "foo.bar.foobar")
// Boolean = true
hasColumn(df, "foo.bar.foobaz")
// Boolean = false
或者更简单:
val columns = Seq(
"foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")
columns.flatMap(c => Try(df(c)).toOption)
// Seq[org.apache.spark.sql.Column] = List(
// foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)
Python相当于:
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row
def has_column(df, col):
try:
df[col]
return True
except AnalysisException:
return False
df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()
has_column(df, "foobar")
## False
has_column(df, "foo")
## True
has_column(df, "foo.bar")
## True
has_column(df, "foo.bar.foobar")
## True
has_column(df, "foo.bar.foobaz")
## False
Try
不是最优的,因为它会在做出决定之前评估 Try
中的表达式。
对于大型数据集,在Scala
中使用以下内容:
df.schema.fieldNames.contains("column_name")
我通常使用的另一个选项是
df.columns.contains("column-name-to-check")
这个returns一个布尔值
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
Try(df.select(colName)).isSuccess
使用上述函数检查包含嵌套列名的列是否存在。
对于那些在寻找 Python 解决方案时偶然发现的人,我使用:
if 'column_name_to_check' in df.columns:
# do something
当我使用 Python 尝试@Jai Prakash 对 df.columns.contains('column-name-to-check')
的回答时,我得到了 AttributeError: 'list' object has no attribute 'contains'
。
如果您在加载时使用模式定义分解 json,则无需检查该列。如果它不在 json 源中,它将显示为空列。
val schemaJson = """
{
"type": "struct",
"fields": [
{
"name": field1
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": field2
"type": "string",
"nullable": true,
"metadata": {}
}
]
}
"""
val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]
val djson = sqlContext.read
.schema(schema )
.option("badRecordsPath", readExceptionPath)
.json(dataPath)
在 PySpark 中,df.columns 为您提供数据框中的列列表,因此 "colName" 在 df.columns return 是真还是假。试一试。祝你好运!
对于嵌套列,您可以使用
df.schema.simpleString().find('column_name')
在 pyspark 中你可以简单地 运行
'field' in df.columns