Python 使用来自 CSV 的列表输入进行多处理
Python multiprocessing with List input from CSV
尝试使用来自 CSV 的列表输入进行多处理
我有一个将列表作为输入的函数。
我目前正在传递来自 CSV 文件的输入,其中每一行都是一个列表。
但是,我不想从 CSV 中逐行 运行 处理函数,而是希望它对 CSV 中的 x 行(比如 10 行)进行多处理,并一次 运行 函数 10 次。
我见过将单个变量传递给函数的多处理示例。
但是,我在尝试从 CSV 多处理多个列表时遇到问题。
import csv
InputFile = "SampleCSV.csv"
def My_Function(row):
print(row)
# Do domething else
if __name__ == '__main__':
with open(InputFile, 'r') as csvFile:
reader = csv.reader(csvFile)
next(reader) # to skip the header row
for row in reader:
a = row
My_Function(row)
csvFile.close()
multiprocessing
模块
您需要有一个与您的 CPU 数量一样大的多处理池。
因为 csv.reader
不支持(据我所知)不止一行的迭代,你需要实现某种缓冲区来累积 cpu_count
的数量行,准备就绪后,立即将负载分配给多个 CPU:
import csv
import multiprocessing
import os
cpu_count = os.cpu_count()
def func(row):
print(row)
with open('test.csv', newline='') as f:
reader = csv.reader(f)
with multiprocessing.Pool(cpu_count) as p:
buffer = []
for row_num, row in enumerate(reader):
if row_num == 0:
continue
buffer.append(row)
# if read enough rows -> do work
if row_num % cpu_count == 0:
p.map(func, buffer)
# don't forget to clear the buffer
buffer = []
else:
# process leftover rows
p.map(func, buffer)
尝试使用来自 CSV 的列表输入进行多处理
我有一个将列表作为输入的函数。 我目前正在传递来自 CSV 文件的输入,其中每一行都是一个列表。 但是,我不想从 CSV 中逐行 运行 处理函数,而是希望它对 CSV 中的 x 行(比如 10 行)进行多处理,并一次 运行 函数 10 次。 我见过将单个变量传递给函数的多处理示例。 但是,我在尝试从 CSV 多处理多个列表时遇到问题。
import csv
InputFile = "SampleCSV.csv"
def My_Function(row):
print(row)
# Do domething else
if __name__ == '__main__':
with open(InputFile, 'r') as csvFile:
reader = csv.reader(csvFile)
next(reader) # to skip the header row
for row in reader:
a = row
My_Function(row)
csvFile.close()
multiprocessing
模块
您需要有一个与您的 CPU 数量一样大的多处理池。
因为 csv.reader
不支持(据我所知)不止一行的迭代,你需要实现某种缓冲区来累积 cpu_count
的数量行,准备就绪后,立即将负载分配给多个 CPU:
import csv
import multiprocessing
import os
cpu_count = os.cpu_count()
def func(row):
print(row)
with open('test.csv', newline='') as f:
reader = csv.reader(f)
with multiprocessing.Pool(cpu_count) as p:
buffer = []
for row_num, row in enumerate(reader):
if row_num == 0:
continue
buffer.append(row)
# if read enough rows -> do work
if row_num % cpu_count == 0:
p.map(func, buffer)
# don't forget to clear the buffer
buffer = []
else:
# process leftover rows
p.map(func, buffer)