写入 csv 的多重处理
Multiprocessing for writing in csv
我正在尝试将约 1.46 亿行的巨大数据集写入 CSV。我试过这个:
def paramlist():
for row in nodes.itertuples():
l = []
for row2 in ref_stops.itertuples():
l.append((row[1], row[2], row[3], row2[1],
row2[2], row2[3], row2[4], haversine(row[3], row[2], row2[3], row2[2])))
yield l
pool = multiprocessing.Pool()
pool.map(func, paramlist())
def func(params):
with open(r'big_file.csv', 'a') as f:
writer = csv.writer(f)
for row in params:
writer.writerow(row)
此代码有效,但它占用了我所有的 RAM 并中止。
我该如何优化它?
尝试以块的形式写入数据。
部分读取您的数据框(假设您从数据框写入),即根据某些块。
每个块写一次,这样执行得更快。
pool.map
将消耗整个可迭代对象,然后再将其部分提交给池的工作人员。这就是为什么你会遇到内存问题。
您应该使用 pool.imap
instead in order to avoid this. See this post 进行全面解释。
话虽这么说,但我真诚地怀疑多处理是否会以您编写程序的方式加速您的程序,因为瓶颈是磁盘 I/O。一遍又一遍地打开、附加和关闭文件几乎不比一次顺序写入快。并行写入单个文件是不可能的。
假设 l
的生成需要一些时间,如果您这样编写程序可能会加速:
from contextlib import closing
import multiprocessing
import csv
import pandas as pd
import numpy as np
# Just for testing
ref_stops = pd.DataFrame(np.arange(100).reshape((-1, 5)))
nodes = pd.DataFrame(np.arange(400).reshape((-1, 4)))
def haversine(a, b, c, d):
return a*b*c*d
# This function will be executed by the workers
def join_rows(row):
row_list = []
# join row with all rows from `ref_stops` and compute haversine
for row2 in ref_stops.itertuples():
row_list.append((row[1], row[2], row[3],
row2[1], row2[2], row2[3], row2[4],
haversine(row[3], row[2], row2[3], row2[2])))
return row_list
def main():
with closing(multiprocessing.Pool()) as pool:
# joined_rows will contain lists of joined rows in arbitrary order.
# use name=None so we get proper tuples, pandas named tuples cannot be pickled, see https://github.com/pandas-dev/pandas/issues/11791
joined_rows = pool.imap_unordered(join_rows, nodes.itertuples(name=None))
# open file and write out all rows from incoming lists of rows
with open(r'big_file.csv', 'w') as f:
writer = csv.writer(f)
for row_list in joined_rows:
writer.writerows(row_list)
if __name__ == '__main__':
main()
我假设你不关心顺序,否则你不会一开始就选择多处理,对吧?
这样,生成行列表的不是主进程,而是工作进程。一旦一个工作进程完成了一个列表,它就会 return 它到主进程,然后主进程将其条目附加到文件中。然后工作人员获取一个新行并开始构建另一个列表。
通常在程序中使用更多 pandas 功能可能会更好(我假设您使用 pandas 数据帧是因为 itertuples
)。例如,您可以创建一个新的 Dataframe 而不是行列表,并使 haversine
与 pandas.Series
对象兼容,这样您就不必在每个条目上都调用它。
我正在尝试将约 1.46 亿行的巨大数据集写入 CSV。我试过这个:
def paramlist():
for row in nodes.itertuples():
l = []
for row2 in ref_stops.itertuples():
l.append((row[1], row[2], row[3], row2[1],
row2[2], row2[3], row2[4], haversine(row[3], row[2], row2[3], row2[2])))
yield l
pool = multiprocessing.Pool()
pool.map(func, paramlist())
def func(params):
with open(r'big_file.csv', 'a') as f:
writer = csv.writer(f)
for row in params:
writer.writerow(row)
此代码有效,但它占用了我所有的 RAM 并中止。
我该如何优化它?
尝试以块的形式写入数据。 部分读取您的数据框(假设您从数据框写入),即根据某些块。 每个块写一次,这样执行得更快。
pool.map
将消耗整个可迭代对象,然后再将其部分提交给池的工作人员。这就是为什么你会遇到内存问题。
您应该使用 pool.imap
instead in order to avoid this. See this post 进行全面解释。
话虽这么说,但我真诚地怀疑多处理是否会以您编写程序的方式加速您的程序,因为瓶颈是磁盘 I/O。一遍又一遍地打开、附加和关闭文件几乎不比一次顺序写入快。并行写入单个文件是不可能的。
假设 l
的生成需要一些时间,如果您这样编写程序可能会加速:
from contextlib import closing
import multiprocessing
import csv
import pandas as pd
import numpy as np
# Just for testing
ref_stops = pd.DataFrame(np.arange(100).reshape((-1, 5)))
nodes = pd.DataFrame(np.arange(400).reshape((-1, 4)))
def haversine(a, b, c, d):
return a*b*c*d
# This function will be executed by the workers
def join_rows(row):
row_list = []
# join row with all rows from `ref_stops` and compute haversine
for row2 in ref_stops.itertuples():
row_list.append((row[1], row[2], row[3],
row2[1], row2[2], row2[3], row2[4],
haversine(row[3], row[2], row2[3], row2[2])))
return row_list
def main():
with closing(multiprocessing.Pool()) as pool:
# joined_rows will contain lists of joined rows in arbitrary order.
# use name=None so we get proper tuples, pandas named tuples cannot be pickled, see https://github.com/pandas-dev/pandas/issues/11791
joined_rows = pool.imap_unordered(join_rows, nodes.itertuples(name=None))
# open file and write out all rows from incoming lists of rows
with open(r'big_file.csv', 'w') as f:
writer = csv.writer(f)
for row_list in joined_rows:
writer.writerows(row_list)
if __name__ == '__main__':
main()
我假设你不关心顺序,否则你不会一开始就选择多处理,对吧?
这样,生成行列表的不是主进程,而是工作进程。一旦一个工作进程完成了一个列表,它就会 return 它到主进程,然后主进程将其条目附加到文件中。然后工作人员获取一个新行并开始构建另一个列表。
通常在程序中使用更多 pandas 功能可能会更好(我假设您使用 pandas 数据帧是因为 itertuples
)。例如,您可以创建一个新的 Dataframe 而不是行列表,并使 haversine
与 pandas.Series
对象兼容,这样您就不必在每个条目上都调用它。