Python - 避免巨大数据集的内存错误
Python - avoiding memory error with HUGE data set
我有一个连接到 PostGreSQL 数据库的 python 程序。在这个数据库中,我有相当多的数据(大约 12 亿行)。幸运的是,我不必同时分析所有这些行。
那 12 亿行分布在几个 table 上(大约 30)。目前我正在访问一个名为 table_3 的 table,我想在其中访问具有特定“did”值的所有行(因为该列被调用)。
我使用 SQL 命令计算了行数:
SELECT count(*) FROM table_3 WHERE did='356002062376054';
其中 returns 有 1.57 亿行。
我将对所有这些行执行一些“分析”(提取 2 个特定值)并对这些值进行一些计算,然后将它们写入字典,然后将它们保存回 PostGreSQL 在不同的 table.
问题是我正在创建大量列表和字典来管理所有这些我最终 运行 内存不足,即使我使用 Python 3 64 位并且有64 GB 内存。
一些代码:
CONNECTION = psycopg2.connect('<psycopg2 formatted string>')
CURSOR = CONNECTION.cursor()
DID_LIST = ["357139052424715",
"353224061929963",
"356002064810514",
"356002064810183",
"358188051768472",
"358188050598029",
"356002061925067",
"358188056470108",
"356002062376054",
"357460064130045"]
SENSOR_LIST = [1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 801, 900, 901,
902, 903, 904, 905, 906, 907,
908, 909, 910, 911]
for did in did_list:
table_name = did
for sensor_id in sensor_list:
rows = get_data(did, sensor_id)
list_object = create_standard_list(sensor_id, rows) # Happens here
formatted_list = format_table_dictionary(list_object) # Or here
pushed_rows = write_to_table(table_name, formatted_list) #write_to_table method is omitted as that is not my problem.
def get_data(did, table_id):
"""Getting data from postgresql."""
table_name = "table_{0}".format(table_id)
query = """SELECT * FROM {0} WHERE did='{1}'
ORDER BY timestamp""".format(table_name, did)
CURSOR.execute(query)
CONNECTION.commit()
return CURSOR
def create_standard_list(sensor_id, data):
"""Formats DB data to dictionary"""
list_object = []
print("Create standard list")
for row in data: # data is the psycopg2 CURSOR
row_timestamp = row[2]
row_data = row[3]
temp_object = {"sensor_id": sensor_id, "timestamp": row_timestamp,
"data": row_data}
list_object.append(temp_object)
return list_object
def format_table_dictionary(list_dict):
"""Formats dictionary to simple data
table_name = (dates, data_count, first row)"""
print("Formatting dict to DB")
temp_today = 0
dict_list = []
first_row = {}
count = 1
for elem in list_dict:
# convert to seconds
date = datetime.fromtimestamp(elem['timestamp'] / 1000)
today = int(date.strftime('%d'))
if temp_today is not today:
if not first_row:
first_row = elem['data']
first_row_str = str(first_row)
dict_object = {"sensor_id": elem['sensor_id'],
"date": date.strftime('%d/%m-%Y'),
"reading_count": count,
# size in MB of data
"approx_data_size": (count*len(first_row_str)/1000),
"time": date.strftime('%H:%M:%S'),
"first_row": first_row}
dict_list.append(dict_object)
first_row = {}
temp_today = today
count = 0
else:
count += 1
return dict_list
我的错误发生在创建代码中标有注释的两个列表中的任何一个时。它代表我的电脑停止响应,并最终将我注销。我 运行宁 windows 10 如果这很重要的话。
我知道我用“create_standard_list”方法创建的第一个列表可以被排除,并且该代码可以是“format_table_dictionary”代码中的运行,从而避免list 在内存中有 157 个 mio 元素,但我认为我将 运行 放入的其他一些 table 也会有类似的问题,而且可能更大,所以我现在考虑优化它, 但我不确定我能做什么?
我想写入文件并没有多大帮助,因为我必须读取该文件,然后再将它放回内存中?
极简示例
我有一个table
---------------------------------------------------------------
|Row 1 | did | timestamp | data | unused value | unused value |
|Row 2 | did | timestamp | data | unused value | unused value |
....
---------------------------------
table = [{ values from above row1 }, { values from above row2},...]
connection = psycopg2.connect(<connection string>)
cursor = connection.cursor()
table = cursor.execute("""SELECT * FROM table_3 WHERE did='356002062376054'
ORDER BY timestamp""")
extracted_list = extract(table)
calculated_list = calculate(extracted_list)
... write to db ...
def extract(table):
"""extract all but unused values"""
new_list = []
for row in table:
did = row[0]
timestamp = row[1]
data = row[2]
a_dict = {'did': did, 'timestamp': timestamp, 'data': data}
new_list.append(a_dict)
return new_list
def calculate(a_list):
"""perform calculations on values"""
dict_list = []
temp_today = 0
count = 0
for row in a_list:
date = datetime.fromtimestamp(row['timestamp'] / 1000) # from ms to sec
today = int(date.strfime('%d'))
if temp_today is not today:
new_dict = {'date': date.strftime('%d/%m-%Y'),
'reading_count': count,
'time': date.strftime('%H:%M:%S')}
dict_list.append(new_dict)
return dict_list
create_standard_list()
和 format_table_dictionary()
可以构建生成器(yield
ing 每个项目而不是 return
ing 完整列表),这将停止将整个列表保存在内存中并且所以应该可以解决您的问题,例如:
def create_standard_list(sensor_id, data):
for row in data:
row_timestamp = row[2]
row_data = row[3]
temp_object = {"sensor_id": sensor_id, "timestamp": row_timestamp,
"data": row_data}
yield temp_object
#^ yield each item instead of appending to a list
有关 generators and the yield
keyword 的更多信息。
IIUC,您在这里尝试做的是在 Python 代码中模拟 SQL GROUP BY
表达式。这永远不会像直接在数据库中那样快速和高效。
您的示例代码似乎有一些问题,但我将其理解为:您想
对于给定 did
发生的每一天,计算每天的行数。还有,你是
对每组值的一天中的最小(或最大,或中位数,这无关紧要)时间感兴趣,即每天。
让我们建立一个小例子table(在 Oracle 上测试):
create table t1 (id number primary key, created timestamp, did number, other_data varchar2(200));
insert into t1 values (1, to_timestamp('2017-01-31 17:00:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'some text');
insert into t1 values (2, to_timestamp('2017-01-31 19:53:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'some more text');
insert into t1 values (3, to_timestamp('2017-02-01 08:10:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'another day');
insert into t1 values (4, to_timestamp('2017-02-01 15:55:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'another day, rainy afternoon');
insert into t1 values (5, to_timestamp('2017-02-01 15:59:00', 'YYYY-MM-DD HH24:MI:SS'), 9002, 'different did');
insert into t1 values (6, to_timestamp('2017-02-03 01:01:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'night shift');
我们有一些行,分布在几天内,9001
。 did 9002
也有一个值,我们将
忽视。现在让我们将要写入第二个 table 的行作为简单的 SELECT .. GROUP BY
:
select
count(*) cnt,
to_char(created, 'YYYY-MM-DD') day,
min(to_char(created, 'HH24:MI:SS')) min_time
from t1
where did = 9001
group by to_char(created, 'YYYY-MM-DD')
;
我们按 created
列(时间戳)的日期对所有行进行分组。我们 select
每组的行数,一天本身,以及 - 只是为了好玩 - 每组的最短时间部分
团体。结果:
cnt day min_time
2 2017-02-01 08:10:00
1 2017-02-03 01:01:00
2 2017-01-31 17:00:00
现在你有了第二个 table 作为 SELECT
。从中创建 table 很简单:
create table t2 as
select
... as above
;
HTH!
我有一个连接到 PostGreSQL 数据库的 python 程序。在这个数据库中,我有相当多的数据(大约 12 亿行)。幸运的是,我不必同时分析所有这些行。
那 12 亿行分布在几个 table 上(大约 30)。目前我正在访问一个名为 table_3 的 table,我想在其中访问具有特定“did”值的所有行(因为该列被调用)。
我使用 SQL 命令计算了行数:
SELECT count(*) FROM table_3 WHERE did='356002062376054';
其中 returns 有 1.57 亿行。
我将对所有这些行执行一些“分析”(提取 2 个特定值)并对这些值进行一些计算,然后将它们写入字典,然后将它们保存回 PostGreSQL 在不同的 table.
问题是我正在创建大量列表和字典来管理所有这些我最终 运行 内存不足,即使我使用 Python 3 64 位并且有64 GB 内存。
一些代码:
CONNECTION = psycopg2.connect('<psycopg2 formatted string>')
CURSOR = CONNECTION.cursor()
DID_LIST = ["357139052424715",
"353224061929963",
"356002064810514",
"356002064810183",
"358188051768472",
"358188050598029",
"356002061925067",
"358188056470108",
"356002062376054",
"357460064130045"]
SENSOR_LIST = [1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 801, 900, 901,
902, 903, 904, 905, 906, 907,
908, 909, 910, 911]
for did in did_list:
table_name = did
for sensor_id in sensor_list:
rows = get_data(did, sensor_id)
list_object = create_standard_list(sensor_id, rows) # Happens here
formatted_list = format_table_dictionary(list_object) # Or here
pushed_rows = write_to_table(table_name, formatted_list) #write_to_table method is omitted as that is not my problem.
def get_data(did, table_id):
"""Getting data from postgresql."""
table_name = "table_{0}".format(table_id)
query = """SELECT * FROM {0} WHERE did='{1}'
ORDER BY timestamp""".format(table_name, did)
CURSOR.execute(query)
CONNECTION.commit()
return CURSOR
def create_standard_list(sensor_id, data):
"""Formats DB data to dictionary"""
list_object = []
print("Create standard list")
for row in data: # data is the psycopg2 CURSOR
row_timestamp = row[2]
row_data = row[3]
temp_object = {"sensor_id": sensor_id, "timestamp": row_timestamp,
"data": row_data}
list_object.append(temp_object)
return list_object
def format_table_dictionary(list_dict):
"""Formats dictionary to simple data
table_name = (dates, data_count, first row)"""
print("Formatting dict to DB")
temp_today = 0
dict_list = []
first_row = {}
count = 1
for elem in list_dict:
# convert to seconds
date = datetime.fromtimestamp(elem['timestamp'] / 1000)
today = int(date.strftime('%d'))
if temp_today is not today:
if not first_row:
first_row = elem['data']
first_row_str = str(first_row)
dict_object = {"sensor_id": elem['sensor_id'],
"date": date.strftime('%d/%m-%Y'),
"reading_count": count,
# size in MB of data
"approx_data_size": (count*len(first_row_str)/1000),
"time": date.strftime('%H:%M:%S'),
"first_row": first_row}
dict_list.append(dict_object)
first_row = {}
temp_today = today
count = 0
else:
count += 1
return dict_list
我的错误发生在创建代码中标有注释的两个列表中的任何一个时。它代表我的电脑停止响应,并最终将我注销。我 运行宁 windows 10 如果这很重要的话。
我知道我用“create_standard_list”方法创建的第一个列表可以被排除,并且该代码可以是“format_table_dictionary”代码中的运行,从而避免list 在内存中有 157 个 mio 元素,但我认为我将 运行 放入的其他一些 table 也会有类似的问题,而且可能更大,所以我现在考虑优化它, 但我不确定我能做什么?
我想写入文件并没有多大帮助,因为我必须读取该文件,然后再将它放回内存中?
极简示例
我有一个table
---------------------------------------------------------------
|Row 1 | did | timestamp | data | unused value | unused value |
|Row 2 | did | timestamp | data | unused value | unused value |
....
---------------------------------
table = [{ values from above row1 }, { values from above row2},...]
connection = psycopg2.connect(<connection string>)
cursor = connection.cursor()
table = cursor.execute("""SELECT * FROM table_3 WHERE did='356002062376054'
ORDER BY timestamp""")
extracted_list = extract(table)
calculated_list = calculate(extracted_list)
... write to db ...
def extract(table):
"""extract all but unused values"""
new_list = []
for row in table:
did = row[0]
timestamp = row[1]
data = row[2]
a_dict = {'did': did, 'timestamp': timestamp, 'data': data}
new_list.append(a_dict)
return new_list
def calculate(a_list):
"""perform calculations on values"""
dict_list = []
temp_today = 0
count = 0
for row in a_list:
date = datetime.fromtimestamp(row['timestamp'] / 1000) # from ms to sec
today = int(date.strfime('%d'))
if temp_today is not today:
new_dict = {'date': date.strftime('%d/%m-%Y'),
'reading_count': count,
'time': date.strftime('%H:%M:%S')}
dict_list.append(new_dict)
return dict_list
create_standard_list()
和 format_table_dictionary()
可以构建生成器(yield
ing 每个项目而不是 return
ing 完整列表),这将停止将整个列表保存在内存中并且所以应该可以解决您的问题,例如:
def create_standard_list(sensor_id, data):
for row in data:
row_timestamp = row[2]
row_data = row[3]
temp_object = {"sensor_id": sensor_id, "timestamp": row_timestamp,
"data": row_data}
yield temp_object
#^ yield each item instead of appending to a list
有关 generators and the yield
keyword 的更多信息。
IIUC,您在这里尝试做的是在 Python 代码中模拟 SQL GROUP BY
表达式。这永远不会像直接在数据库中那样快速和高效。
您的示例代码似乎有一些问题,但我将其理解为:您想
对于给定 did
发生的每一天,计算每天的行数。还有,你是
对每组值的一天中的最小(或最大,或中位数,这无关紧要)时间感兴趣,即每天。
让我们建立一个小例子table(在 Oracle 上测试):
create table t1 (id number primary key, created timestamp, did number, other_data varchar2(200));
insert into t1 values (1, to_timestamp('2017-01-31 17:00:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'some text');
insert into t1 values (2, to_timestamp('2017-01-31 19:53:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'some more text');
insert into t1 values (3, to_timestamp('2017-02-01 08:10:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'another day');
insert into t1 values (4, to_timestamp('2017-02-01 15:55:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'another day, rainy afternoon');
insert into t1 values (5, to_timestamp('2017-02-01 15:59:00', 'YYYY-MM-DD HH24:MI:SS'), 9002, 'different did');
insert into t1 values (6, to_timestamp('2017-02-03 01:01:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'night shift');
我们有一些行,分布在几天内,9001
。 did 9002
也有一个值,我们将
忽视。现在让我们将要写入第二个 table 的行作为简单的 SELECT .. GROUP BY
:
select
count(*) cnt,
to_char(created, 'YYYY-MM-DD') day,
min(to_char(created, 'HH24:MI:SS')) min_time
from t1
where did = 9001
group by to_char(created, 'YYYY-MM-DD')
;
我们按 created
列(时间戳)的日期对所有行进行分组。我们 select
每组的行数,一天本身,以及 - 只是为了好玩 - 每组的最短时间部分
团体。结果:
cnt day min_time
2 2017-02-01 08:10:00
1 2017-02-03 01:01:00
2 2017-01-31 17:00:00
现在你有了第二个 table 作为 SELECT
。从中创建 table 很简单:
create table t2 as
select
... as above
;
HTH!