使用 SQL 查询 pyspark 数据框查找所有空值
Find all nulls with SQL query over pyspark dataframe
我有一个 StructField
的数据框,它具有混合模式(DoubleType
、StringType
、LongType
等)。
我想 'iterate' 遍历所有列以 return 汇总统计信息。例如:
set_min = df.select([
fn.min(self.df[c]).alias(c) for c in self.df.columns
]).collect()
是我用来在每列中查找最小值的方法。那很好用。但是当我尝试类似查找空值的设计时:
set_null = df.filter(
(lambda x: self.df[x]).isNull().count()
).collect()
我得到了有意义的 TypeError: condition should be string or Column
,我正在传递一个函数。
或列表理解:
set_null = self.df[c].alias(c).isNull() for c in self.df.columns
然后我尝试将 SQL 查询作为字符串传递给它:
set_null = df.filter('SELECT fields FROM table WHERE column = NUL').collect()
我得到:
ParseException: "\nmismatched input 'FROM' expecting <EOF>(line 1, pos 14)\n\n== SQL ==\nSELECT fields FROM table WHERE column = NULL\n--------------^^^\n"
我如何将我的函数作为 'string or column' 传递,以便我可以使用 filter
或 where
,为什么纯 SQL 语句不起作用?
如果您想计算每列 NULL
个值,您可以计算非空值并从总数中减去。
例如:
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
spark = SparkSession.builder.master("local").getOrCreate()
df = spark.createDataFrame(
data=[
(1, None),
(1, 1),
(None, None),
(1, 1),
(None, 1),
(1, None),
],
schema=("A", "B")
)
total = df.count()
missing_counts = df.select(
*[(total - fn.count(col)).alias("missing(%s)" % col) for col in df.columns]
)
missing_counts.show()
>>> +----------+----------+
... |missing(A)|missing(B)|
... +----------+----------+
... | 2| 3|
... +----------+----------+
您尝试的几个部分有问题:
- 您的列表理解示例中缺少方括号
- 您在
NUL
中错过了一个 L
- 您的纯 SQL 不起作用,因为
filter
/where
需要一个 where 子句,而不是完整的 SQL 语句;它们只是别名,我更喜欢使用 where
这样更清楚你只需要给出这样一个子句
最后你不需要使用 where
,就像 karlson 也显示的那样。但是从总数中减去意味着你必须对数据帧进行两次评估(这可以通过缓存来缓解,但仍然不理想)。还有更直接的方法:
>>> df.select([fn.sum(fn.isnull(c).cast('int')).alias(c) for c in df.columns]).show()
+---+---+
| A| B|
+---+---+
| 2| 3|
+---+---+
这是可行的,因为将布尔值转换为整数会为 True
提供 1
,为 False
提供 0
。如果你更喜欢SQL,等价于:
df.select([fn.expr('SUM(CAST(({c} IS NULL) AS INT)) AS {c}'.format(c=c)) for c in df.columns]).show()
或更好,没有演员:
df.select([fn.expr('SUM(IF({c} IS NULL, 1, 0)) AS {c}'.format(c=c)) for c in df.columns]).show()
我有一个 StructField
的数据框,它具有混合模式(DoubleType
、StringType
、LongType
等)。
我想 'iterate' 遍历所有列以 return 汇总统计信息。例如:
set_min = df.select([
fn.min(self.df[c]).alias(c) for c in self.df.columns
]).collect()
是我用来在每列中查找最小值的方法。那很好用。但是当我尝试类似查找空值的设计时:
set_null = df.filter(
(lambda x: self.df[x]).isNull().count()
).collect()
我得到了有意义的 TypeError: condition should be string or Column
,我正在传递一个函数。
或列表理解:
set_null = self.df[c].alias(c).isNull() for c in self.df.columns
然后我尝试将 SQL 查询作为字符串传递给它:
set_null = df.filter('SELECT fields FROM table WHERE column = NUL').collect()
我得到:
ParseException: "\nmismatched input 'FROM' expecting <EOF>(line 1, pos 14)\n\n== SQL ==\nSELECT fields FROM table WHERE column = NULL\n--------------^^^\n"
我如何将我的函数作为 'string or column' 传递,以便我可以使用 filter
或 where
,为什么纯 SQL 语句不起作用?
如果您想计算每列 NULL
个值,您可以计算非空值并从总数中减去。
例如:
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
spark = SparkSession.builder.master("local").getOrCreate()
df = spark.createDataFrame(
data=[
(1, None),
(1, 1),
(None, None),
(1, 1),
(None, 1),
(1, None),
],
schema=("A", "B")
)
total = df.count()
missing_counts = df.select(
*[(total - fn.count(col)).alias("missing(%s)" % col) for col in df.columns]
)
missing_counts.show()
>>> +----------+----------+
... |missing(A)|missing(B)|
... +----------+----------+
... | 2| 3|
... +----------+----------+
您尝试的几个部分有问题:
- 您的列表理解示例中缺少方括号
- 您在
NUL
中错过了一个 L
- 您的纯 SQL 不起作用,因为
filter
/where
需要一个 where 子句,而不是完整的 SQL 语句;它们只是别名,我更喜欢使用where
这样更清楚你只需要给出这样一个子句
最后你不需要使用 where
,就像 karlson 也显示的那样。但是从总数中减去意味着你必须对数据帧进行两次评估(这可以通过缓存来缓解,但仍然不理想)。还有更直接的方法:
>>> df.select([fn.sum(fn.isnull(c).cast('int')).alias(c) for c in df.columns]).show()
+---+---+
| A| B|
+---+---+
| 2| 3|
+---+---+
这是可行的,因为将布尔值转换为整数会为 True
提供 1
,为 False
提供 0
。如果你更喜欢SQL,等价于:
df.select([fn.expr('SUM(CAST(({c} IS NULL) AS INT)) AS {c}'.format(c=c)) for c in df.columns]).show()
或更好,没有演员:
df.select([fn.expr('SUM(IF({c} IS NULL, 1, 0)) AS {c}'.format(c=c)) for c in df.columns]).show()