在 python 条件下使用服务器端游标读取大量数据

Reading huge data using server side cursors with conditions in python

我有一个巨大的 table(约 8 亿),我需要根据某些段条件获取数据。

数据:

d_id    month_id    sec     average Class
89      201701      S       5.98    A
73      201703      N       7.63    B
31      201708      F       6.38    P
11      201709      K       6.38    P

我有两个列表:

monthList = [201701,201702,201703]

所以 sql 查询是:

sql_query = str("""select * from dbo.table_name where month_id IN monthList;""") 

现在我想将此数据保存在服务器端游标中,并从中获取基于类列表的子集

curs = cnxn.cursor('Class')
classList = ['A','B','P']

while True:
    records = curs.fetchmany(int(1e3))
    if not records:
      break
    for record in records:
      # here I want to use the classList to subset the data , something like 
      df = pd.DataFrame()
      df.append(curs.fetchmany([cursor['Class'] == classList]))

      # And once all the records of each Class has been read create csv
      df.to_csv("ClassList.csv")

所以对于上面给出的数据: 将生成 3 个 csv: 1. ClassA.csv

d_id    month_id    sec     average Class
31      201708      F       6.38    P
11      201709      K       6.38    P

所有数据都在我使用 psycopg2 调用的 PostgreSQL 中

有人可以帮我解决以下问题: 1.这甚至可以用服务器端游标来做。 2. 我基本上需要从所有数据中创建每个 Class 的组合 csv,也基于作为列表给出的 month_id。

这不是服务器端游标的工作方式 - 它们使服务器保持状态 而客户端遍历结果集,批量获取,可能 反转遍历。好处是服务器保持状态 关于连接,以便客户端可以更有效地分配内存 (默认情况下,客户端会在允许您的代码执行之前尝试获取所有内容 迭代。对于 80 亿行,这可能会导致问题。

但要记住的关键是游标返回的数据是确定的 通过查询 - 你可以比较每一行的结果来决定做什么, 但是你还是一行一行操作,不改变返回的结果 服务器。但是...如果您滥用职权,您的 DBA 可能会怀着暴力意图追捕您 服务器...在进行多次遍历时保持 80 亿行的服务器端游标会给数据库带来 很多 的内存压力,从而减慢其他用户的速度。

通过 Pandas 对本地系统内存也是如此 - 根据您的示例,除了使用它生成 CSV 之外,您实际上没有做任何其他事情。

怎么办?

如果您只需要编写大型组合 CSV,使用 psycopg2 的本机 copy_expert 功能直接流式传输到 CSV 是可行的方法,结合服务器端游标.

我经常使用这种方法从大型数据集创建 CSV - 而 保持数据库和客户端内存平坦。它也更快 比我可以直接写的任何逐行 CSV 生成 Python.

最后,不清楚您想要 1 个 CSV 还是 3 个 CSV。您的最终评论 引用 "combined CSV",所以要做到这一点,以迈克尔的评论为基础, 尝试这样的事情:

sql = '''
copy (
    select * 
      from your_table_name 
     where month_id in (. . .) 
       and Class in (. . .)
)
to stdout
with (
  format csv, header
)'''

stmt = db.cursor('my_cursor')
with open('output.csv', 'w') as outfile:
    stmt.copy_expert(sql, outfile)

如果您确实需要 3 个单独的 CSV,您可以修改方法以执行三个 单独的通行证。

希望对您有所帮助。