如何使用 multiprocess/multithreading 读取 csv 文件并将其存储在生成的新变量中?

How to use multiprocess/multithreading to read csv file and store it in generated new variables?

def filename(name):
    filename = f'{name}.csv'
    return pd.read_csv(filename(name))

with concurrent.futures.ProcessPoolExecutor() as executor:
    files = [
                '20190702',
                '20190703',
                '20190708',
    ]

    # list of stings which will be new variable names
    name_list = ['df_' + i.split('2019')[1] for i in files]

    # list to store new variables
    executor_list = []

    for i in range(len(files)):
        name = name_list[i]
        dataframe = executor.submit(filename, files[i])
        exec(f"{name} = {dataframe}") # Some error here!
        exec(f"executor_list.append({name})")

    for i in executor_list:
        exec(f"{i} = {i.result()}")

我在 colab 中 运行 遇到了这个错误:

  File "<string>", line 1
    df_0702 = <Future at 0x7f0e5b8cc3c8 state=running>
              ^
SyntaxError: invalid syntax

executor.submit returns Future 对象。所以你应该从未来的对象中检索结果

files = ['20190702', '20190703', '20190708']

futures = {}

with concurrent.futures.ProcessPoolExecutor() as executor:
    for filename in files:
        vname = 'df_' + filename.split('2019')[1]
        filename = filename + '.csv'
        future = executor.submit(pd.read_csv, filename)
        futures[vname] = future

for vname, f in futures.items():
    dataframe = f.result()
    # do something with vname and dataframe

此外,切勿使用 execeval 函数,除非是为了 debugging/testing 目的。它们使您的代码不安全且难以调试。

您不需要使用 ProcessPoolExecutor,因为您的操作是 I/O 绑定的。产生线程比进程便宜。因此,您可以改用 ThreadPoolExecutor。

def filename(name):
    filename = f'{name}.csv'
    return filename, pd.read_csv(filename(name))

files = ['20190702', '20190703', '20190708']

futures = []

with concurrent.futures.ThreadPoolExecutor() as executor:
    for i, filename in enumerate(files):
        vname = 'df_' + filename.split('2019')[1]
        filename = filename + '.csv'
        futures.append(executor.submit(pd.read_csv, filename))

results = [f.result() for f in futures]