基于配置和用户的动态过滤器:PySpark

Dynamic filter based on the config and user : PySpark

需要根据来自两个地方的过滤器动态应用过滤器。 (1) 配置 (2) user/job 输入

需要应用的过滤器:(1) config.filters 中提到的任何过滤器和 (2) 用户提供的过滤器,即基于 运行 日期的天数。即rundate-history_days。 如果用户将 运行 日期传递为 2020-01-20 并将 history_days 传递为 5,则最终过滤器应为:

 cust=123 and (activity_day between rundate and rundate-5)

我能够使用两步过滤器实现这一目标。 (1) 使用 sql 方式过滤来自配置 df.filter(config['config']) (2) 在 1 之上使用第二轮过滤器 activity_day>=date_sub(rundate,history_days) & activity_day<rundate

有什么方法可以将两步过滤器合二为一吗?这样我就可以在配置中维护两个过滤器并以某种方式替换用户输入?

数据:

df = spark.createDataFrame(
        [
          (123,"2020-01-01"),
          (124,"2020-01-01"),
          (123,"2019-01-01")
        ],
        ("cust", "activity_day")
    )

配置:

config = """
                [ {
                      "source":"df_1",
                      "filters":"cust=123",
                  }
                ]

您可以首先解析您的配置以提取过滤条件'filtersand the corresponding value into a dictionary and add therun_datecondition to it to further filter theDataFrame`

数据准备

sparkDF = sql.createDataFrame(
        [
          (123,"2020-01-01"),
          (124,"2020-01-01"),
          (123,"2019-01-01")
        ],
        ("cust", "activity_day")
    )

sparkDF.show()

+----+------------+
|cust|activity_day|
+----+------------+
| 123|  2020-01-01|
| 124|  2020-01-01|
| 123|  2019-01-01|
+----+------------+

解析配置并生成过滤器条件

config = [ {"source":"df_1","filters":"cust=123",}]

filter_dict = {}

for row in config:
    if 'filters' in row:
        key,value = row['filters'].split("=")
        filter_dict[key] = value

filter_dict

{'cust': '123'}

run_date = "2020-01-01"

upper_range = F.to_date(F.lit(run_date))
lower_range = F.date_add(upper_range,-5)

secondary_condn = (F.col('activity_day').between(lower_range,upper_range))

final_condn = (F.col(column) == filter_dict[column]) & (secondary_condn)

过滤数据帧

sparkDF.filter(final_condn).show()

+----+------------+
|cust|activity_day|
+----+------------+
| 123|  2020-01-01|
+----+------------+
SQL 处理多个条件的方法

您可以使用 createOrReplaceTempView 进行更复杂的过滤。这个想法是构建 WHERE 个过滤器以将其合并到 SQL 查询以过滤行

sparkDF = sql.createDataFrame(
        [
          (123,"2020-01-01",1,"NL"),
          (124,"2020-01-01",0,"US"),
          (123,"2019-01-01",1,"IN"),
          (124,"2020-01-02",0,"NL"),
        ],
        ("cust", "activity_day","is_deleted","country")
    )

sparkDF.show()

+----+------------+----------+-------+
|cust|activity_day|is_deleted|country|
+----+------------+----------+-------+
| 123|  2020-01-01|         1|     NL|
| 124|  2020-01-01|         0|     US|
| 123|  2019-01-01|         1|     IN|
| 124|  2020-01-02|         0|     NL|
+----+------------+----------+-------+

where_filter = ""
logical_flag = " OR "

for i,row in enumerate(config):
    if 'filters' in row:
        if i == 0:
            where_filter += row['filters']
        else:
            where_filter += logical_flag + "(" + row['filters'] + ")"

where_filter # O/P -- 'cust=123 OR (is_deleted != 1 AND country="NL")'

if where_filter != "":
    sql.sql(f"""
        SELECT *
        FROM customer_activity
        WHERE {where_filter}
    """).show()

+----+------------+----------+-------+
|cust|activity_day|is_deleted|country|
+----+------------+----------+-------+
| 123|  2020-01-01|         1|     NL|
| 123|  2019-01-01|         1|     IN|
| 124|  2020-01-02|         0|     NL|
+----+------------+----------+-------+