在 Pandas 中使用多处理读取 csv 文件的最简单方法

Easiest way to read csv files with multiprocessing in Pandas

这是我的问题。
一堆 .csv 文件(或其他文件)。 Pandas 是读取它们并保存为 Dataframe 格式的简单方法。但是当文件量很大的时候,我想用multiprocessing来读取文件以节省一些时间。

我的早期尝试

我手动将文件分成不同的路径。分别使用:

os.chdir("./task_1")
files = os.listdir('.')
files.sort()
for file in files:
    filename,extname = os.path.splitext(file)
    if extname == '.csv':
        f = pd.read_csv(file)
        df = (f.VALUE.as_matrix()).reshape(75,90)   

然后合并。

如何 运行 他们与 pool 一起解决我的问题?
如有任何建议,我们将不胜感激!

如果您不反对使用其他库,则可以使用 Graphlab 的 sframe。这会创建一个类似于数据帧的对象,如果性能是一个大问题,它可以非常快速地读取数据。

dask 库旨在解决您的问题。

使用Pool:

import os
import pandas as pd 
from multiprocessing import Pool

# wrap your csv importer in a function that can be mapped
def read_csv(filename):
    'converts a filename to a pandas dataframe'
    return pd.read_csv(filename)


def main():

    # get a list of file names
    files = os.listdir('.')
    file_list = [filename for filename in files if filename.split('.')[1]=='csv']

    # set up your pool
    with Pool(processes=8) as pool: # or whatever your hardware can support

        # have your pool map the file names to dataframes
        df_list = pool.map(read_csv, file_list)

        # reduce the list of dataframes to a single dataframe
        combined_df = pd.concat(df_list, ignore_index=True)

if __name__ == '__main__':
    main()

我没有让 map/map_async 工作, 但设法与 apply_async.

合作

两种可能的方式(不知道哪个更好):

  • A) 在 末尾连接
  • B) 在
  • 期间连接

我发现 glob 很容易 listfitler 目录中的文件

from glob import glob
import pandas as pd
from multiprocessing import Pool

folder = "./task_1/" # note the "/" at the end
file_list = glob(folder+'*.xlsx')

def my_read(filename):
    f = pd.read_csv(filename)
    return (f.VALUE.as_matrix()).reshape(75,90)

#DF_LIST = [] # A) end
DF = pd.DataFrame() # B) during

def DF_LIST_append(result):
    #DF_LIST.append(result) # A) end
    global DF # B) during
    DF = pd.concat([DF,result], ignore_index=True) # B) during

pool = Pool(processes=8)

for file in file_list:
    pool.apply_async(my_read, args = (file,), callback = DF_LIST_append)

pool.close()
pool.join()

#DF = pd.concat(DF_LIST, ignore_index=True) # A) end

print(DF.shape)