Spark:如果数据框中不存在列,则Return空列
Spark: Return empty column if column does not exist in dataframe
如下面的代码所示,我正在将一个 JSON 文件读入一个数据帧,然后从该数据帧中选择一些字段到另一个数据帧中。
df_record = spark.read.json("path/to/file.JSON",multiLine=True)
df_basicInfo = df_record.select(col("key1").alias("ID"), \
col("key2").alias("Status"), \
col("key3.ResponseType").alias("ResponseType"), \
col("key3.someIndicator").alias("SomeIndicator") \
)
问题是,有时 JSON 文件没有我尝试获取的一些密钥 - 例如 ResponseType
。所以它最终会抛出如下错误:
org.apache.spark.sql.AnalysisException: No such struct field ResponseType
如何在不强制读取模式的情况下解决这个问题?当它不可用时,是否可以使它 return 在该列下成为 NULL?
是否提到如何检测数据框中的列是否可用。但是,这个问题是关于如何使用该功能的。
使用 has_column
函数定义 by zero323 and general guidelines about adding empty columns
from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *
if has_column(df_record, "key3.ResponseType"):
df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
# Adjust types according to your needs
df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string"))
并为您需要的每一列重复,或者
df_record.withColumn(
"ResponseType",
when(
lit(has_column(df_record, "key3.ResponseType")),
col("key3.ResponseType")
).otherwise(lit(None).cast("string"))
根据您的要求调整类型,并对其余列重复此过程。
或者定义一个涵盖所有所需类型的模式:
schema = StructType([
StructField("key1", StringType()),
StructField("key2", StringType()),
StructField("key2", StructType([
StructField("ResponseType", StringType()),
StructField("someIndicator", StringType()),
]))
])
df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)
(再次调整类型),并使用您当前的代码。
Spark 缺少一个简单的函数:struct_has(STRUCT, PATH)
或 struct_get(STRUCT, PATH, DEFAULT)
其中 PATH
使用点表示法。
所以我写了一个很简单的UDF:
来自https://gist.github.com/ebuildy/3c9b2663d47f7b65fbc12cfb469ae19c:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row
spark.udf.register("struct_def", (root:GenericRowWithSchema, path: String, defaultValue: String) => {
var fields = path.split("\.")
var buffer:Row = root
val lastItem = fields.last
fields = fields.dropRight(1)
fields.foreach( (field:String) => {
if (buffer != null) {
if (buffer.schema.fieldNames.contains(field)) {
buffer = buffer.getStruct(buffer.fieldIndex(field))
} else {
buffer = null
}
}
})
if (buffer == null) {
defaultValue
} else {
buffer.getString(buffer.fieldIndex(lastItem))
}
})
这让你可以这样查询:
SELECT struct_get(MY_COL, "foo.bar", "no") FROM DATA
我遇到了同样的问题,我使用了与 Thomas 类似的方法。
我的用户自定义函数代码:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row
spark.udf.register("tryGet", (root:GenericRowWithSchema, fieldName: String) => {
var buffer:Row = root
if (buffer != null) {
if (buffer.schema.fieldNames.contains(fieldName)) {
buffer.getString(buffer.fieldIndex(fieldName))
} else {
null
}
}
else {
null
}
})
然后是我的查询:
%sql
SELECT
Id,
Created,
Payload.Type,
tryGet(Payload, "Error") as Error,
FROM dataWithJson
WHERE Payload.Type = 'Action'
所以我尝试使用已接受的答案,但是我发现如果列 key3.ResponseType
不存在,它将失败。
你可以这样做 -
def hasColumn(df: DataFrame, path: String) =
if (Try(df(path)).isSuccess == true) {
df(path)
}
else {
lit(null)
}
如果列存在,则在函数中求值,如果不存在,则只是 returns 一个 NULL 列。
您现在可以使用它 -
df_basicInfo = df_record.withColumn("ResponseType", hasColumn(df_record, "key3.ResponseType"))
如下面的代码所示,我正在将一个 JSON 文件读入一个数据帧,然后从该数据帧中选择一些字段到另一个数据帧中。
df_record = spark.read.json("path/to/file.JSON",multiLine=True)
df_basicInfo = df_record.select(col("key1").alias("ID"), \
col("key2").alias("Status"), \
col("key3.ResponseType").alias("ResponseType"), \
col("key3.someIndicator").alias("SomeIndicator") \
)
问题是,有时 JSON 文件没有我尝试获取的一些密钥 - 例如 ResponseType
。所以它最终会抛出如下错误:
org.apache.spark.sql.AnalysisException: No such struct field ResponseType
如何在不强制读取模式的情况下解决这个问题?当它不可用时,是否可以使它 return 在该列下成为 NULL?
使用 has_column
函数定义
from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *
if has_column(df_record, "key3.ResponseType"):
df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
# Adjust types according to your needs
df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string"))
并为您需要的每一列重复,或者
df_record.withColumn(
"ResponseType",
when(
lit(has_column(df_record, "key3.ResponseType")),
col("key3.ResponseType")
).otherwise(lit(None).cast("string"))
根据您的要求调整类型,并对其余列重复此过程。
或者定义一个涵盖所有所需类型的模式:
schema = StructType([
StructField("key1", StringType()),
StructField("key2", StringType()),
StructField("key2", StructType([
StructField("ResponseType", StringType()),
StructField("someIndicator", StringType()),
]))
])
df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)
(再次调整类型),并使用您当前的代码。
Spark 缺少一个简单的函数:struct_has(STRUCT, PATH)
或 struct_get(STRUCT, PATH, DEFAULT)
其中 PATH
使用点表示法。
所以我写了一个很简单的UDF:
来自https://gist.github.com/ebuildy/3c9b2663d47f7b65fbc12cfb469ae19c:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row
spark.udf.register("struct_def", (root:GenericRowWithSchema, path: String, defaultValue: String) => {
var fields = path.split("\.")
var buffer:Row = root
val lastItem = fields.last
fields = fields.dropRight(1)
fields.foreach( (field:String) => {
if (buffer != null) {
if (buffer.schema.fieldNames.contains(field)) {
buffer = buffer.getStruct(buffer.fieldIndex(field))
} else {
buffer = null
}
}
})
if (buffer == null) {
defaultValue
} else {
buffer.getString(buffer.fieldIndex(lastItem))
}
})
这让你可以这样查询:
SELECT struct_get(MY_COL, "foo.bar", "no") FROM DATA
我遇到了同样的问题,我使用了与 Thomas 类似的方法。 我的用户自定义函数代码:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row
spark.udf.register("tryGet", (root:GenericRowWithSchema, fieldName: String) => {
var buffer:Row = root
if (buffer != null) {
if (buffer.schema.fieldNames.contains(fieldName)) {
buffer.getString(buffer.fieldIndex(fieldName))
} else {
null
}
}
else {
null
}
})
然后是我的查询:
%sql
SELECT
Id,
Created,
Payload.Type,
tryGet(Payload, "Error") as Error,
FROM dataWithJson
WHERE Payload.Type = 'Action'
所以我尝试使用已接受的答案,但是我发现如果列 key3.ResponseType
不存在,它将失败。
你可以这样做 -
def hasColumn(df: DataFrame, path: String) =
if (Try(df(path)).isSuccess == true) {
df(path)
}
else {
lit(null)
}
如果列存在,则在函数中求值,如果不存在,则只是 returns 一个 NULL 列。
您现在可以使用它 -
df_basicInfo = df_record.withColumn("ResponseType", hasColumn(df_record, "key3.ResponseType"))