在 pyspark 数据帧中读取时过滤雪花行 table

Filter rows of snowflake table while reading in pyspark dataframe

我有一片巨大的雪花table。我想对 pyspark 中的 table 进行一些转换。我的雪花 table 有一个名为 'snapshot' 的列。我只想读取pyspark dataframe中的当前快照数据并对过滤后的数据进行转换。

那么,有没有办法在读取 spark 数据帧中的雪花 table 时应用过滤行(我不想读取内存中的整个雪花 table,因为它不是高效)还是我需要读取整个雪花 table(在 spark 数据帧中)然后应用过滤器来获取最新的快照,如下所示?

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
snowflake_database="********"
snowflake_schema="********"
source_table_name="********"
snowflake_options = {
    "sfUrl": "********",
    "sfUser": "********",
    "sfPassword": "********",
    "sfDatabase": snowflake_database,
    "sfSchema": snowflake_schema,
    "sfWarehouse": "COMPUTE_WH"
}
df = spark.read \
    .format(SNOWFLAKE_SOURCE_NAME) \
    .options(**snowflake_options) \
    .option("dbtable",snowflake_database+"."+snowflake_schema+"."+source_table_name) \
    .load()
df = df.where(df.snapshot == current_timestamp()).collect()

有一些过滤器形式(filterwhere Spark DataFrame 的功能)Spark 不会传递给 Spark Snowflake 连接器.这意味着,在某些情况下,您可能会获得比预期更多的记录。

最安全的方法是直接使用 SQL 查询:

df = spark.read \
    .format(SNOWFLAKE_SOURCE_NAME) \
    .options(**snowflake_options) \
    .option("query","SELECT X,Y,Z FROM TABLE1 WHERE SNAPSHOT==CURRENT_TIMESTAMP()") \
    .load()

当然,如果你想使用Spark DataFrame的filter/where功能,查看Snowflake中的Query History UI 查看生成的查询是否应用了正确的过滤器。