Spark SQL 在数组中搜索结构
Spark SQL search inside an array for a struct
我的数据结构大致定义如下:
schema = StructType([
# ... fields skipped
StructField("extra_features",
ArrayType(StructType([
StructField("key", StringType(), False),
StructField("value", StringType(), True)
])), nullable = False)],
)
现在,我想在数组列中存在结构 {"key": "somekey", "value": "somevalue"}
的数据框中搜索条目。我该怎么做?
Spark 有一个函数 array_contains
可以用来检查 ArrayType
列的内容,但不幸的是它似乎不能处理复杂类型的数组。可以使用 UDF(用户定义函数)但是:
from pyspark.sql.types import *
from pyspark.sql import Row
import pyspark.sql.functions as F
schema = StructType([StructField("extra_features", ArrayType(StructType([
StructField("key", StringType(), False),
StructField("value", StringType(), True)])),
False)])
df = spark.createDataFrame([
Row([{'key': 'a', 'value': '1'}]),
Row([{'key': 'b', 'value': '2'}])], schema)
# UDF to check whether {'key': 'a', 'value': '1'} is in an array
# The actual data of a (nested) StructType value is a Row
contains_keyval = F.udf(lambda extra_features: Row(key='a', value='1') in extra_features, BooleanType())
df.where(contains_keyval(df.extra_features)).collect()
这导致:
[Row(extra_features=[Row(key=u'a', value=u'1')])]
您还可以使用 UDF 添加另一列来指示 key-value 对是否存在:
df.withColumn('contains_it', contains_keyval(df.extra_features)).collect()
结果:
[Row(extra_features=[Row(key=u'a', value=u'1')], contains_it=True),
Row(extra_features=[Row(key=u'b', value=u'2')], contains_it=False)]
我的数据结构大致定义如下:
schema = StructType([
# ... fields skipped
StructField("extra_features",
ArrayType(StructType([
StructField("key", StringType(), False),
StructField("value", StringType(), True)
])), nullable = False)],
)
现在,我想在数组列中存在结构 {"key": "somekey", "value": "somevalue"}
的数据框中搜索条目。我该怎么做?
Spark 有一个函数 array_contains
可以用来检查 ArrayType
列的内容,但不幸的是它似乎不能处理复杂类型的数组。可以使用 UDF(用户定义函数)但是:
from pyspark.sql.types import *
from pyspark.sql import Row
import pyspark.sql.functions as F
schema = StructType([StructField("extra_features", ArrayType(StructType([
StructField("key", StringType(), False),
StructField("value", StringType(), True)])),
False)])
df = spark.createDataFrame([
Row([{'key': 'a', 'value': '1'}]),
Row([{'key': 'b', 'value': '2'}])], schema)
# UDF to check whether {'key': 'a', 'value': '1'} is in an array
# The actual data of a (nested) StructType value is a Row
contains_keyval = F.udf(lambda extra_features: Row(key='a', value='1') in extra_features, BooleanType())
df.where(contains_keyval(df.extra_features)).collect()
这导致:
[Row(extra_features=[Row(key=u'a', value=u'1')])]
您还可以使用 UDF 添加另一列来指示 key-value 对是否存在:
df.withColumn('contains_it', contains_keyval(df.extra_features)).collect()
结果:
[Row(extra_features=[Row(key=u'a', value=u'1')], contains_it=True),
Row(extra_features=[Row(key=u'b', value=u'2')], contains_it=False)]