将多处理作业的中间结果转储到文件系统,稍后继续处理

Dump intermediate results of multiprocessing job to filesystem and continue with processing later on

我有一份工作使用 multiprocessing 包并通过

调用函数

resultList = pool.map(myFunction, myListOfInputParameters)

输入参数列表的每个条目都相互独立。

这项工作将 运行 几个小时。出于安全原因,我想以固定的时间间隔存储其间的结果,例如每小时一次。

当作业被中止并且我想根据最后可用的备份重新启动它时,我如何才能做到这一点并能够继续处理?

也许使用 pickle。在这里阅读更多内容:

https://docs.python.org/3/library/pickle.html

根据 aws_apprentice 的评论,我创建了一个完整的多处理示例,以防您不确定如何使用中间结果。这是第一次 运行 它将打印 "None" 因为没有中间结果。 运行 再次模拟重启

from multiprocessing import Process
import pickle

def proc(name):
  data = None

  # Load intermediate results if they exist
  try:
    f = open(name+'.pkl', 'rb')
    data = pickle.load(f)
    f.close()
  except:
    pass

  # Do something
  print(data)
  data = "intermediate result for " + name

  # Periodically save your intermediate results
  f = open(name+'.pkl', 'wb')
  pickle.dump(data, f, -1)
  f.close()

processes = []
for x in range(5):
  p = Process(target=proc, args=("proc"+str(x),))
  p.daemon = True
  p.start()
  processes.append(p)

for process in processes:
  process.join()

for process in processes:
  process.terminate()

您也可以使用 json 如果可以以人类可读的格式输出中间结果。如果您需要将数据推送到行中,或者将 sqlite 作为数据库。

至少有两个可能的选择。

  1. 每次调用 myFunction 都将其输出保存到一个唯一命名的文件中。文件名应基于或链接到输入数据。使用父程序收集结果。在这种情况下 myFunction 应该 return 已完成项目的标识符。
  2. 使用 imap_unordered 而不是 map。这将在结果可用时立即开始产生结果,而不是在所有处理完成后返回。让父程序保存 returned 数据并指示哪些项目已完成。

在这两种情况下,程序都必须检查以前运行时保存的数据,以便在重新启动时调整 myListOfInputParameters

哪个选项最好在很大程度上取决于 return 由 myFunction 编辑的数据量。如果这是一个很大的数额,那么将其传回父级会产生大量开销。在那种情况下,选项 1 可能是最好的。

由于写入磁盘相对较慢,使用选项 2 的计算可能会更快。而且父程序更容易跟踪进度。

请注意,您也可以将 imap_unordered 与选项 1 结合使用。