还有一个 SQL 关于 Pandas DF 的问题。帮我一劳永逸地结束这些

Yet another SQL on Pandas DFs question. Help me end these once and for all

我经常将数据从不同的数据库拉入 pandas 数据帧,进行一些处理,然后最终达到我强烈希望在 SQL 中写入的处理步骤(通常涉及跨 DF 的连接).我想我会写一个快速函数:

我想出的函数是这样的:

def SQL_on_DFs(query:str, dfs=[]) -> pd.DataFrame:
    """convenience function to write sql on arbitrary DFs
        ...execute in a temp sqlite db and return results as DF
    """
    # validation - every item in dfs list must be a dataframe
    assert [isinstance(x, pd.DataFrame) for x in dfs].all()

    # if no temp.db exists here, sqlite3 will create one
    sql3conn = sqlite3.connect("temp.db")
    
    # write dfs to sqlite3 db to perform sql query on them there
    for df in dfs:
        df.to_sql(sql3conn)
    
    result = pd.read_sql_query(query, sql3conn)
    return result

# TEST
df_ab = pd.DataFrame({"a": [1,2,3], "b": [2,2,2]})
df_ac = pd.DataFrame({"a": [1,2,3], "c": [3,3,3]})
df_abc = pd.DataFrame({"a": [1,2,3], "b": [2,2,2], "c": [3,3,3]})

q = "select ab.*, ac.c from df_ab ab left join df_ac ac on ab.a = ac.a"
assert SQL_on_DFs(q, [df_ab, df_ac]).equals(df_abc)

超级有用,超级简单。正确的?!错误的!你不觉得自己像个傻瓜吗?绝对是你而不是我。

df.to_sql(sql3conn) 抛出错误。它说它是 missing 1 required positional argument: 'con',但我们已经通过了 con。它缺少的是另一个必需的位置参数,table 的 name 写入 sqlite3 作为。所以我需要一种方法来让 df_ab 的“名称”应该是 "df_ab",而 df_ac 的“名称”应该是 "df_ac".

现在我知道你在想什么了:Max Power,你为什么不把你的第二个函数参数从 dfs = [df_ab, df_ac] 改成 [(df_ab, "df_ab"), (df_ac, "df_ac")]

请不要让我那样做。这应该是一个便利函数,看起来不方便

好的,我有一个答案,这个方法甚至根本不需要 DF 作为参数。它从 SQL 语句中解析它们的名称,然后抓取相应的对象。此答案满足以下条件:

  • 有:一些Pandas个数据帧
  • 想要:一个接受 SQL 查询的函数,就像它们是 SQL 表一样引用它们,执行 SQL 查询和 returns 作为 DataFrame 的结果

...

鉴于此设置(导入和辅助函数)...

import pandas as pd 
import sqlite3
from sql_metadata import Parser
from inspect import currentframe


def get_dfs_from_caller_scope(df_names: list[str]) -> dict[str, pd.DataFrame]:
    """Given a list of DF names, this returns a dict mapping df-name to df from original* scope
    
    *'original scope' in this function context meaning the caller's caller's scope
     """
    vars_in_orig_scope = currentframe().f_back.f_back.f_locals

    dfs_map = {}
    for k,v in vars_in_orig_scope.items():
        if k in df_names and isinstance(v, pd.DataFrame):
            dfs_map[k] = v

    return dfs_map

这个函数现在可以满足我的要求:

def SQL_on_DFs(query:str)->pd.DataFrame:
    """convenience function to write sql on arbitrary DFs
        ...execute in a temp sqlite db and return results as DF
    """
    # get table-names referenced by query, which should correspond to dataframes in caller's scope
    tablenames = Parser(query).tables

    # get a mapping of those tablenames in query to the actual corresponding DataFrame objects from caller's scope 
    tablenames_and_assoc_dfs = get_dfs_from_caller_scope(tablenames)

    # if no temp.db exists here, sqlite3 will create one
    sql3conn = sqlite3.connect("temp.db")
    
    # write dfs to sqlite3 db to perform sql query on them there
    for df_name,df in tablenames_and_assoc_dfs.items():
        df.to_sql(df_name, sql3conn, index=False, if_exists='replace')
    
    result = pd.read_sql_query(query, sql3conn)
    return result

测试:

df_ab = pd.DataFrame({"a": [1,2,3], "b": [2,2,2]})
df_ac = pd.DataFrame({"a": [1,2,3], "c": [3,3,3]})
df_abc = pd.DataFrame({"a": [1,2,3], "b": [2,2,2], "c": [3,3,3]})

q = "select ab.*, ac.c from df_ab ab left join df_ac ac on ab.a = ac.a"
assert SQL_on_DFs(q).equals(df_abc)  # Passes