在 Pandas 中使用多处理读取 csv 文件的最简单方法
Easiest way to read csv files with multiprocessing in Pandas
这是我的问题。
一堆 .csv 文件(或其他文件)。 Pandas 是读取它们并保存为 Dataframe
格式的简单方法。但是当文件量很大的时候,我想用multiprocessing来读取文件以节省一些时间。
我的早期尝试
我手动将文件分成不同的路径。分别使用:
os.chdir("./task_1")
files = os.listdir('.')
files.sort()
for file in files:
filename,extname = os.path.splitext(file)
if extname == '.csv':
f = pd.read_csv(file)
df = (f.VALUE.as_matrix()).reshape(75,90)
然后合并。
如何 运行 他们与 pool
一起解决我的问题?
如有任何建议,我们将不胜感激!
如果您不反对使用其他库,则可以使用 Graphlab 的 sframe。这会创建一个类似于数据帧的对象,如果性能是一个大问题,它可以非常快速地读取数据。
dask
库旨在解决您的问题。
使用Pool
:
import os
import pandas as pd
from multiprocessing import Pool
# wrap your csv importer in a function that can be mapped
def read_csv(filename):
'converts a filename to a pandas dataframe'
return pd.read_csv(filename)
def main():
# get a list of file names
files = os.listdir('.')
file_list = [filename for filename in files if filename.split('.')[1]=='csv']
# set up your pool
with Pool(processes=8) as pool: # or whatever your hardware can support
# have your pool map the file names to dataframes
df_list = pool.map(read_csv, file_list)
# reduce the list of dataframes to a single dataframe
combined_df = pd.concat(df_list, ignore_index=True)
if __name__ == '__main__':
main()
我没有让 map/map_async 工作,
但设法与 apply_async.
合作
两种可能的方式(不知道哪个更好):
- A) 在 末尾连接
- B) 在
期间连接
我发现 glob 很容易 list 和 fitler 目录中的文件
from glob import glob
import pandas as pd
from multiprocessing import Pool
folder = "./task_1/" # note the "/" at the end
file_list = glob(folder+'*.xlsx')
def my_read(filename):
f = pd.read_csv(filename)
return (f.VALUE.as_matrix()).reshape(75,90)
#DF_LIST = [] # A) end
DF = pd.DataFrame() # B) during
def DF_LIST_append(result):
#DF_LIST.append(result) # A) end
global DF # B) during
DF = pd.concat([DF,result], ignore_index=True) # B) during
pool = Pool(processes=8)
for file in file_list:
pool.apply_async(my_read, args = (file,), callback = DF_LIST_append)
pool.close()
pool.join()
#DF = pd.concat(DF_LIST, ignore_index=True) # A) end
print(DF.shape)
这是我的问题。
一堆 .csv 文件(或其他文件)。 Pandas 是读取它们并保存为 Dataframe
格式的简单方法。但是当文件量很大的时候,我想用multiprocessing来读取文件以节省一些时间。
我的早期尝试
我手动将文件分成不同的路径。分别使用:
os.chdir("./task_1")
files = os.listdir('.')
files.sort()
for file in files:
filename,extname = os.path.splitext(file)
if extname == '.csv':
f = pd.read_csv(file)
df = (f.VALUE.as_matrix()).reshape(75,90)
然后合并。
如何 运行 他们与 pool
一起解决我的问题?
如有任何建议,我们将不胜感激!
如果您不反对使用其他库,则可以使用 Graphlab 的 sframe。这会创建一个类似于数据帧的对象,如果性能是一个大问题,它可以非常快速地读取数据。
dask
库旨在解决您的问题。
使用Pool
:
import os
import pandas as pd
from multiprocessing import Pool
# wrap your csv importer in a function that can be mapped
def read_csv(filename):
'converts a filename to a pandas dataframe'
return pd.read_csv(filename)
def main():
# get a list of file names
files = os.listdir('.')
file_list = [filename for filename in files if filename.split('.')[1]=='csv']
# set up your pool
with Pool(processes=8) as pool: # or whatever your hardware can support
# have your pool map the file names to dataframes
df_list = pool.map(read_csv, file_list)
# reduce the list of dataframes to a single dataframe
combined_df = pd.concat(df_list, ignore_index=True)
if __name__ == '__main__':
main()
我没有让 map/map_async 工作, 但设法与 apply_async.
合作两种可能的方式(不知道哪个更好):
- A) 在 末尾连接
- B) 在 期间连接
我发现 glob 很容易 list 和 fitler 目录中的文件
from glob import glob
import pandas as pd
from multiprocessing import Pool
folder = "./task_1/" # note the "/" at the end
file_list = glob(folder+'*.xlsx')
def my_read(filename):
f = pd.read_csv(filename)
return (f.VALUE.as_matrix()).reshape(75,90)
#DF_LIST = [] # A) end
DF = pd.DataFrame() # B) during
def DF_LIST_append(result):
#DF_LIST.append(result) # A) end
global DF # B) during
DF = pd.concat([DF,result], ignore_index=True) # B) during
pool = Pool(processes=8)
for file in file_list:
pool.apply_async(my_read, args = (file,), callback = DF_LIST_append)
pool.close()
pool.join()
#DF = pd.concat(DF_LIST, ignore_index=True) # A) end
print(DF.shape)