如何在多处理完成之前存储所有输出?
How to store all the output before multiprocessing finish?
我想 运行 在 python 中进行多进程处理。
这是一个例子:
def myFunction(name,age):
output = paste(name,age)
return output
names = ["A","B","C"]
ages = ["1","2","3"]
with mp.Pool(processes=no_cpus) as pool:
results = pool.starmap(myFunction,zip(names,ages))
results_table = pd.concat(results)
results_table.to_csv(file,sep="\t",index=False)
myFunction
在实际情况下需要很长时间。有时我不得不中断 运行ning 并重新开始。但是 results
只会在所有 pool.starmap
完成后写入输出文件。如何在完成之前存储 intermediate/cache 结果?
我不想将 myFunction 从 return
更改为 .to_csv()
谢谢!
不使用 map
,而是使用方法 imap
,其中 returns 一个迭代器,当迭代时每个结果可用时一个一个地给出每个结果(即由 [=14 返回) =]).但是,结果仍然必须按顺序返回。如果您不关心顺序,则使用 imap_unordered
.
随着每个数据帧的返回和迭代,它被转换为 CSV 文件,并根据它是否是第一个被处理的结果输出带或不带 header。
import pandas as pd
import multiprocessing as mp
def paste(name, age):
return pd.DataFrame([[name, age]], columns=['Name', 'Age'])
def myFunction(t):
name, age = t # unpack passed tuple
output = paste(name, age)
return output
# Required for Windows:
if __name__ == '__main__':
names = ["A","B","C"]
ages = ["1","2","3"]
no_cpus = min(len(names), mp.cpu_count())
csv_file = 'test.txt'
with mp.Pool(processes=no_cpus) as pool:
# Results from imap must be iterated
for index, result in enumerate(pool.imap(myFunction, zip(names,ages))):
if index == 0:
# First return value
header = True
open_flags = "w"
else:
header = False
open_flags = "a"
with open(csv_file, open_flags, newline='') as f:
result.to_csv(f, sep="\t", index=False, header=header)
test.txt的输出:
Name Age
A 1
B 2
C 3
我想 运行 在 python 中进行多进程处理。 这是一个例子:
def myFunction(name,age):
output = paste(name,age)
return output
names = ["A","B","C"]
ages = ["1","2","3"]
with mp.Pool(processes=no_cpus) as pool:
results = pool.starmap(myFunction,zip(names,ages))
results_table = pd.concat(results)
results_table.to_csv(file,sep="\t",index=False)
myFunction
在实际情况下需要很长时间。有时我不得不中断 运行ning 并重新开始。但是 results
只会在所有 pool.starmap
完成后写入输出文件。如何在完成之前存储 intermediate/cache 结果?
我不想将 myFunction 从 return
更改为 .to_csv()
谢谢!
不使用 map
,而是使用方法 imap
,其中 returns 一个迭代器,当迭代时每个结果可用时一个一个地给出每个结果(即由 [=14 返回) =]).但是,结果仍然必须按顺序返回。如果您不关心顺序,则使用 imap_unordered
.
随着每个数据帧的返回和迭代,它被转换为 CSV 文件,并根据它是否是第一个被处理的结果输出带或不带 header。
import pandas as pd
import multiprocessing as mp
def paste(name, age):
return pd.DataFrame([[name, age]], columns=['Name', 'Age'])
def myFunction(t):
name, age = t # unpack passed tuple
output = paste(name, age)
return output
# Required for Windows:
if __name__ == '__main__':
names = ["A","B","C"]
ages = ["1","2","3"]
no_cpus = min(len(names), mp.cpu_count())
csv_file = 'test.txt'
with mp.Pool(processes=no_cpus) as pool:
# Results from imap must be iterated
for index, result in enumerate(pool.imap(myFunction, zip(names,ages))):
if index == 0:
# First return value
header = True
open_flags = "w"
else:
header = False
open_flags = "a"
with open(csv_file, open_flags, newline='') as f:
result.to_csv(f, sep="\t", index=False, header=header)
test.txt的输出:
Name Age
A 1
B 2
C 3