基于配置和用户的动态过滤器:PySpark
Dynamic filter based on the config and user : PySpark
需要根据来自两个地方的过滤器动态应用过滤器。 (1) 配置 (2) user/job 输入
需要应用的过滤器:(1) config.filters 中提到的任何过滤器和 (2) 用户提供的过滤器,即基于 运行 日期的天数。即rundate-history_days。
如果用户将 运行 日期传递为 2020-01-20 并将 history_days 传递为 5,则最终过滤器应为:
cust=123 and (activity_day between rundate and rundate-5)
我能够使用两步过滤器实现这一目标。
(1) 使用 sql 方式过滤来自配置 df.filter(config['config'])
(2) 在 1 之上使用第二轮过滤器
activity_day>=date_sub(rundate,history_days) & activity_day<rundate
有什么方法可以将两步过滤器合二为一吗?这样我就可以在配置中维护两个过滤器并以某种方式替换用户输入?
数据:
df = spark.createDataFrame(
[
(123,"2020-01-01"),
(124,"2020-01-01"),
(123,"2019-01-01")
],
("cust", "activity_day")
)
配置:
config = """
[ {
"source":"df_1",
"filters":"cust=123",
}
]
您可以首先解析您的配置以提取过滤条件'filtersand the corresponding value into a dictionary and add the
run_datecondition to it to further filter the
DataFrame`
数据准备
sparkDF = sql.createDataFrame(
[
(123,"2020-01-01"),
(124,"2020-01-01"),
(123,"2019-01-01")
],
("cust", "activity_day")
)
sparkDF.show()
+----+------------+
|cust|activity_day|
+----+------------+
| 123| 2020-01-01|
| 124| 2020-01-01|
| 123| 2019-01-01|
+----+------------+
解析配置并生成过滤器条件
config = [ {"source":"df_1","filters":"cust=123",}]
filter_dict = {}
for row in config:
if 'filters' in row:
key,value = row['filters'].split("=")
filter_dict[key] = value
filter_dict
{'cust': '123'}
run_date = "2020-01-01"
upper_range = F.to_date(F.lit(run_date))
lower_range = F.date_add(upper_range,-5)
secondary_condn = (F.col('activity_day').between(lower_range,upper_range))
final_condn = (F.col(column) == filter_dict[column]) & (secondary_condn)
过滤数据帧
sparkDF.filter(final_condn).show()
+----+------------+
|cust|activity_day|
+----+------------+
| 123| 2020-01-01|
+----+------------+
SQL 处理多个条件的方法
您可以使用 createOrReplaceTempView 进行更复杂的过滤。这个想法是构建 WHERE
个过滤器以将其合并到 SQL
查询以过滤行
sparkDF = sql.createDataFrame(
[
(123,"2020-01-01",1,"NL"),
(124,"2020-01-01",0,"US"),
(123,"2019-01-01",1,"IN"),
(124,"2020-01-02",0,"NL"),
],
("cust", "activity_day","is_deleted","country")
)
sparkDF.show()
+----+------------+----------+-------+
|cust|activity_day|is_deleted|country|
+----+------------+----------+-------+
| 123| 2020-01-01| 1| NL|
| 124| 2020-01-01| 0| US|
| 123| 2019-01-01| 1| IN|
| 124| 2020-01-02| 0| NL|
+----+------------+----------+-------+
where_filter = ""
logical_flag = " OR "
for i,row in enumerate(config):
if 'filters' in row:
if i == 0:
where_filter += row['filters']
else:
where_filter += logical_flag + "(" + row['filters'] + ")"
where_filter # O/P -- 'cust=123 OR (is_deleted != 1 AND country="NL")'
if where_filter != "":
sql.sql(f"""
SELECT *
FROM customer_activity
WHERE {where_filter}
""").show()
+----+------------+----------+-------+
|cust|activity_day|is_deleted|country|
+----+------------+----------+-------+
| 123| 2020-01-01| 1| NL|
| 123| 2019-01-01| 1| IN|
| 124| 2020-01-02| 0| NL|
+----+------------+----------+-------+
需要根据来自两个地方的过滤器动态应用过滤器。 (1) 配置 (2) user/job 输入
需要应用的过滤器:(1) config.filters 中提到的任何过滤器和 (2) 用户提供的过滤器,即基于 运行 日期的天数。即rundate-history_days。 如果用户将 运行 日期传递为 2020-01-20 并将 history_days 传递为 5,则最终过滤器应为:
cust=123 and (activity_day between rundate and rundate-5)
我能够使用两步过滤器实现这一目标。
(1) 使用 sql 方式过滤来自配置 df.filter(config['config'])
(2) 在 1 之上使用第二轮过滤器
activity_day>=date_sub(rundate,history_days) & activity_day<rundate
有什么方法可以将两步过滤器合二为一吗?这样我就可以在配置中维护两个过滤器并以某种方式替换用户输入?
数据:
df = spark.createDataFrame(
[
(123,"2020-01-01"),
(124,"2020-01-01"),
(123,"2019-01-01")
],
("cust", "activity_day")
)
配置:
config = """
[ {
"source":"df_1",
"filters":"cust=123",
}
]
您可以首先解析您的配置以提取过滤条件'filtersand the corresponding value into a dictionary and add the
run_datecondition to it to further filter the
DataFrame`
数据准备
sparkDF = sql.createDataFrame(
[
(123,"2020-01-01"),
(124,"2020-01-01"),
(123,"2019-01-01")
],
("cust", "activity_day")
)
sparkDF.show()
+----+------------+
|cust|activity_day|
+----+------------+
| 123| 2020-01-01|
| 124| 2020-01-01|
| 123| 2019-01-01|
+----+------------+
解析配置并生成过滤器条件
config = [ {"source":"df_1","filters":"cust=123",}]
filter_dict = {}
for row in config:
if 'filters' in row:
key,value = row['filters'].split("=")
filter_dict[key] = value
filter_dict
{'cust': '123'}
run_date = "2020-01-01"
upper_range = F.to_date(F.lit(run_date))
lower_range = F.date_add(upper_range,-5)
secondary_condn = (F.col('activity_day').between(lower_range,upper_range))
final_condn = (F.col(column) == filter_dict[column]) & (secondary_condn)
过滤数据帧
sparkDF.filter(final_condn).show()
+----+------------+
|cust|activity_day|
+----+------------+
| 123| 2020-01-01|
+----+------------+
SQL 处理多个条件的方法
您可以使用 createOrReplaceTempView 进行更复杂的过滤。这个想法是构建 WHERE
个过滤器以将其合并到 SQL
查询以过滤行
sparkDF = sql.createDataFrame(
[
(123,"2020-01-01",1,"NL"),
(124,"2020-01-01",0,"US"),
(123,"2019-01-01",1,"IN"),
(124,"2020-01-02",0,"NL"),
],
("cust", "activity_day","is_deleted","country")
)
sparkDF.show()
+----+------------+----------+-------+
|cust|activity_day|is_deleted|country|
+----+------------+----------+-------+
| 123| 2020-01-01| 1| NL|
| 124| 2020-01-01| 0| US|
| 123| 2019-01-01| 1| IN|
| 124| 2020-01-02| 0| NL|
+----+------------+----------+-------+
where_filter = ""
logical_flag = " OR "
for i,row in enumerate(config):
if 'filters' in row:
if i == 0:
where_filter += row['filters']
else:
where_filter += logical_flag + "(" + row['filters'] + ")"
where_filter # O/P -- 'cust=123 OR (is_deleted != 1 AND country="NL")'
if where_filter != "":
sql.sql(f"""
SELECT *
FROM customer_activity
WHERE {where_filter}
""").show()
+----+------------+----------+-------+
|cust|activity_day|is_deleted|country|
+----+------------+----------+-------+
| 123| 2020-01-01| 1| NL|
| 123| 2019-01-01| 1| IN|
| 124| 2020-01-02| 0| NL|
+----+------------+----------+-------+