return 超时后多处理中完成的任务值 - python

return completed task values in multiprocessing after timeout - python

我想 return 在杀死所有正在进行的和排队的任务后,在给定的超时时间内,多处理中已经完成的任务值。 例如,以下函数需要 运行 并行使用 pool.starmap() 对于值,1 到 100。

def func(x):
  time.sleep(1)
  return x * 2

假设在 5 秒(定义的超时)后,与值 1 到 10 相关的任务已完成,而 11-90 仍在 运行ning 或排队。然后在 5 秒时,我想终止所有进程和 return 已完成任务的值 ​​1 到 10 的值。那么 100 个任务的 return 列表应该是 [2,4, 6,8,10, ....18, 20, None, None,....None]。 我试过的主要功能如下,

def main():
  pool = multiprocessing.Pool(processes=3)
  val = [[i] for i in range(100)]
  result_obj = pool.starmap_async(func, val)
  result_obj.wait(5)
  if result_obj.ready():
     result = result_obj.get(1)

但是,这里的进程仍在后台 运行,无法找到杀死这些进程并在 5 秒后捕获已完成任务的方法。这可能吗?

包含您需要的所有信息:要在生成结果时检索结果,imap_unordered 可能是最好的函数,因为它 returns 结果到主线程一旦完成。您只需要执行一些簿记以确保结果最终位于结果队列中的正确位置。实现这一点的一种方法是将索引传递给并行函数,然后该函数 returns.

下面的一些简化的伪代码,您应该能够通过以下方法得出解决方案:

def worker(id, other_args):
    arg1, arg2, ..., argn = other_args # Assuming other_args is a tuple or something else unpackable
    do something
    return idx, result

obtained_results = [None] * nr_tasks
with multiprocessing.Pool() as pool:
    for idx, result in pool.imap_unordered(worker, enumerate(arg_list)):
        obtained_results[idx] = result
        if time > timeout:
            pool.terminate()
            break

唯一的小缺点是只有在返回第一个超过超时的结果后才会触发超时。对于短期任务,这应该不是问题。如果您有更长的 运行 任务,您可能需要其他东西,但 pool.terminate() 可能仍然是适合您的方法。