spark dataframe 过滤操作
spark dataframe filter operation
我有一个 spark 数据框,然后要应用过滤器字符串,过滤器只选择了一些行,但我想知道未选择这些行的原因。
示例:
DataFrame 列:customer_id|col_a|col_b|col_c|col_d
过滤字符串:col_a > 0 & col_b > 4 & col_c < 0 & col_d=0
等...
reason_for_exclusion
可以是任何字符串或字母,只要它说明排除特定行的原因即可。
我可以拆分过滤器字符串并应用每个过滤器,但我有巨大的过滤器字符串并且效率很低所以只是检查是否有更好的方法来执行此操作?
谢谢
您必须检查过滤器表达式中的每个条件,这对于简单的过滤操作而言可能代价高昂。
我建议对所有筛选行显示相同的原因,因为它至少满足该表达式中的一个条件。它不是很漂亮,但我更喜欢它,因为它很高效,尤其是当你必须处理非常大的数据帧时。
data = [(1, 1, 5, -3, 0),(2, 0, 10, -1, 0), (3, 0, 10, -4, 1),]
df = spark.createDataFrame(data, ["customer_id", "col_a", "col_b", "col_c", "col_d"])
filter_expr = "col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0"
filtered_df = df.withColumn("reason_for_exclusion",
when(~expr(filter_expr),lit(filter_expr)
).otherwise(lit(None))
)
filtered_df.show(truncate=False)
输出:
+-----------+-----+-----+-----+-----+-------------------------------------------------+
|customer_id|col_a|col_b|col_c|col_d|reason_for_exclusion |
+-----------+-----+-----+-----+-----+-------------------------------------------------+
|1 |1 |5 |-3 |0 |null |
|2 |0 |10 |-1 |0 |col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0|
|3 |0 |10 |-4 |1 |col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0|
+-----------+-----+-----+-----+-----+-------------------------------------------------+
编辑:
现在,如果您真的只想显示失败的条件,您可以将每个条件转换为单独的列并使用 DataFrame select
进行计算。然后您必须检查评估为 False
的列以了解哪个条件失败。
您可以用 <PREFIX>_<condition>
命名这些列,以便您以后可以轻松识别它们。这是一个完整的例子:
filter_expr = "col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0"
COLUMN_FILTER_PREFIX = "filter_validation_"
original_columns = [col(c) for c in df.columns]
# create column for each condition in filter expression
condition_columns = [expr(f).alias(COLUMN_FILTER_PREFIX + f) for f in filter_expr.split("AND")]
# evaluate condition to True/False and persist the DF with calculated columns
filtered_df = df.select(original_columns + condition_columns)
filtered_df = filtered_df.persist(StorageLevel.MEMORY_AND_DISK)
# get back columns we calculated for filter
filter_col_names = [c for c in filtered_df.columns if COLUMN_FILTER_PREFIX in c]
filter_columns = list()
for c in filter_col_names:
filter_columns.append(
when(~col(f"`{c}`"),
lit(f"{c.replace(COLUMN_FILTER_PREFIX, '')}")
)
)
array_reason_filter = array_except(array(*filter_columns), array(lit(None)))
df_with_filter_reason = filtered_df.withColumn("reason_for_exclusion", array_reason_filter)
df_with_filter_reason.select(*original_columns, col("reason_for_exclusion")).show(truncate=False)
# output
+-----------+-----+-----+-----+-----+----------------------+
|customer_id|col_a|col_b|col_c|col_d|reason_for_exclusion |
+-----------+-----+-----+-----+-----+----------------------+
|1 |1 |5 |-3 |0 |[] |
|2 |0 |10 |-1 |0 |[col_a > 0 ] |
|3 |0 |10 |-4 |1 |[col_a > 0 , col_d=0]|
+-----------+-----+-----+-----+-----+----------------------+
我有一个 spark 数据框,然后要应用过滤器字符串,过滤器只选择了一些行,但我想知道未选择这些行的原因。 示例:
DataFrame 列:customer_id|col_a|col_b|col_c|col_d
过滤字符串:col_a > 0 & col_b > 4 & col_c < 0 & col_d=0
等...
reason_for_exclusion
可以是任何字符串或字母,只要它说明排除特定行的原因即可。
我可以拆分过滤器字符串并应用每个过滤器,但我有巨大的过滤器字符串并且效率很低所以只是检查是否有更好的方法来执行此操作?
谢谢
您必须检查过滤器表达式中的每个条件,这对于简单的过滤操作而言可能代价高昂。 我建议对所有筛选行显示相同的原因,因为它至少满足该表达式中的一个条件。它不是很漂亮,但我更喜欢它,因为它很高效,尤其是当你必须处理非常大的数据帧时。
data = [(1, 1, 5, -3, 0),(2, 0, 10, -1, 0), (3, 0, 10, -4, 1),]
df = spark.createDataFrame(data, ["customer_id", "col_a", "col_b", "col_c", "col_d"])
filter_expr = "col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0"
filtered_df = df.withColumn("reason_for_exclusion",
when(~expr(filter_expr),lit(filter_expr)
).otherwise(lit(None))
)
filtered_df.show(truncate=False)
输出:
+-----------+-----+-----+-----+-----+-------------------------------------------------+
|customer_id|col_a|col_b|col_c|col_d|reason_for_exclusion |
+-----------+-----+-----+-----+-----+-------------------------------------------------+
|1 |1 |5 |-3 |0 |null |
|2 |0 |10 |-1 |0 |col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0|
|3 |0 |10 |-4 |1 |col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0|
+-----------+-----+-----+-----+-----+-------------------------------------------------+
编辑:
现在,如果您真的只想显示失败的条件,您可以将每个条件转换为单独的列并使用 DataFrame select
进行计算。然后您必须检查评估为 False
的列以了解哪个条件失败。
您可以用 <PREFIX>_<condition>
命名这些列,以便您以后可以轻松识别它们。这是一个完整的例子:
filter_expr = "col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0"
COLUMN_FILTER_PREFIX = "filter_validation_"
original_columns = [col(c) for c in df.columns]
# create column for each condition in filter expression
condition_columns = [expr(f).alias(COLUMN_FILTER_PREFIX + f) for f in filter_expr.split("AND")]
# evaluate condition to True/False and persist the DF with calculated columns
filtered_df = df.select(original_columns + condition_columns)
filtered_df = filtered_df.persist(StorageLevel.MEMORY_AND_DISK)
# get back columns we calculated for filter
filter_col_names = [c for c in filtered_df.columns if COLUMN_FILTER_PREFIX in c]
filter_columns = list()
for c in filter_col_names:
filter_columns.append(
when(~col(f"`{c}`"),
lit(f"{c.replace(COLUMN_FILTER_PREFIX, '')}")
)
)
array_reason_filter = array_except(array(*filter_columns), array(lit(None)))
df_with_filter_reason = filtered_df.withColumn("reason_for_exclusion", array_reason_filter)
df_with_filter_reason.select(*original_columns, col("reason_for_exclusion")).show(truncate=False)
# output
+-----------+-----+-----+-----+-----+----------------------+
|customer_id|col_a|col_b|col_c|col_d|reason_for_exclusion |
+-----------+-----+-----+-----+-----+----------------------+
|1 |1 |5 |-3 |0 |[] |
|2 |0 |10 |-1 |0 |[col_a > 0 ] |
|3 |0 |10 |-4 |1 |[col_a > 0 , col_d=0]|
+-----------+-----+-----+-----+-----+----------------------+