将多处理作业的中间结果转储到文件系统,稍后继续处理
Dump intermediate results of multiprocessing job to filesystem and continue with processing later on
我有一份工作使用 multiprocessing
包并通过
调用函数
resultList = pool.map(myFunction, myListOfInputParameters)
。
输入参数列表的每个条目都相互独立。
这项工作将 运行 几个小时。出于安全原因,我想以固定的时间间隔存储其间的结果,例如每小时一次。
当作业被中止并且我想根据最后可用的备份重新启动它时,我如何才能做到这一点并能够继续处理?
也许使用 pickle。在这里阅读更多内容:
https://docs.python.org/3/library/pickle.html
根据 aws_apprentice 的评论,我创建了一个完整的多处理示例,以防您不确定如何使用中间结果。这是第一次 运行 它将打印 "None" 因为没有中间结果。 运行 再次模拟重启
from multiprocessing import Process
import pickle
def proc(name):
data = None
# Load intermediate results if they exist
try:
f = open(name+'.pkl', 'rb')
data = pickle.load(f)
f.close()
except:
pass
# Do something
print(data)
data = "intermediate result for " + name
# Periodically save your intermediate results
f = open(name+'.pkl', 'wb')
pickle.dump(data, f, -1)
f.close()
processes = []
for x in range(5):
p = Process(target=proc, args=("proc"+str(x),))
p.daemon = True
p.start()
processes.append(p)
for process in processes:
process.join()
for process in processes:
process.terminate()
您也可以使用 json 如果可以以人类可读的格式输出中间结果。如果您需要将数据推送到行中,或者将 sqlite 作为数据库。
至少有两个可能的选择。
- 每次调用
myFunction
都将其输出保存到一个唯一命名的文件中。文件名应基于或链接到输入数据。使用父程序收集结果。在这种情况下 myFunction
应该 return 已完成项目的标识符。
- 使用
imap_unordered
而不是 map
。这将在结果可用时立即开始产生结果,而不是在所有处理完成后返回。让父程序保存 returned 数据并指示哪些项目已完成。
在这两种情况下,程序都必须检查以前运行时保存的数据,以便在重新启动时调整 myListOfInputParameters
。
哪个选项最好在很大程度上取决于 return 由 myFunction
编辑的数据量。如果这是一个很大的数额,那么将其传回父级会产生大量开销。在那种情况下,选项 1 可能是最好的。
由于写入磁盘相对较慢,使用选项 2 的计算可能会更快。而且父程序更容易跟踪进度。
请注意,您也可以将 imap_unordered
与选项 1 结合使用。
我有一份工作使用 multiprocessing
包并通过
resultList = pool.map(myFunction, myListOfInputParameters)
。
输入参数列表的每个条目都相互独立。
这项工作将 运行 几个小时。出于安全原因,我想以固定的时间间隔存储其间的结果,例如每小时一次。
当作业被中止并且我想根据最后可用的备份重新启动它时,我如何才能做到这一点并能够继续处理?
也许使用 pickle。在这里阅读更多内容:
https://docs.python.org/3/library/pickle.html
根据 aws_apprentice 的评论,我创建了一个完整的多处理示例,以防您不确定如何使用中间结果。这是第一次 运行 它将打印 "None" 因为没有中间结果。 运行 再次模拟重启
from multiprocessing import Process
import pickle
def proc(name):
data = None
# Load intermediate results if they exist
try:
f = open(name+'.pkl', 'rb')
data = pickle.load(f)
f.close()
except:
pass
# Do something
print(data)
data = "intermediate result for " + name
# Periodically save your intermediate results
f = open(name+'.pkl', 'wb')
pickle.dump(data, f, -1)
f.close()
processes = []
for x in range(5):
p = Process(target=proc, args=("proc"+str(x),))
p.daemon = True
p.start()
processes.append(p)
for process in processes:
process.join()
for process in processes:
process.terminate()
您也可以使用 json 如果可以以人类可读的格式输出中间结果。如果您需要将数据推送到行中,或者将 sqlite 作为数据库。
至少有两个可能的选择。
- 每次调用
myFunction
都将其输出保存到一个唯一命名的文件中。文件名应基于或链接到输入数据。使用父程序收集结果。在这种情况下myFunction
应该 return 已完成项目的标识符。 - 使用
imap_unordered
而不是map
。这将在结果可用时立即开始产生结果,而不是在所有处理完成后返回。让父程序保存 returned 数据并指示哪些项目已完成。
在这两种情况下,程序都必须检查以前运行时保存的数据,以便在重新启动时调整 myListOfInputParameters
。
哪个选项最好在很大程度上取决于 return 由 myFunction
编辑的数据量。如果这是一个很大的数额,那么将其传回父级会产生大量开销。在那种情况下,选项 1 可能是最好的。
由于写入磁盘相对较慢,使用选项 2 的计算可能会更快。而且父程序更容易跟踪进度。
请注意,您也可以将 imap_unordered
与选项 1 结合使用。