return 超时后多处理中完成的任务值 - python
return completed task values in multiprocessing after timeout - python
我想 return 在杀死所有正在进行的和排队的任务后,在给定的超时时间内,多处理中已经完成的任务值。
例如,以下函数需要 运行 并行使用 pool.starmap()
对于值,1 到 100。
def func(x):
time.sleep(1)
return x * 2
假设在 5 秒(定义的超时)后,与值 1 到 10 相关的任务已完成,而 11-90 仍在 运行ning 或排队。然后在 5 秒时,我想终止所有进程和 return 已完成任务的值 1 到 10 的值。那么 100 个任务的 return 列表应该是 [2,4, 6,8,10, ....18, 20, None, None,....None]。
我试过的主要功能如下,
def main():
pool = multiprocessing.Pool(processes=3)
val = [[i] for i in range(100)]
result_obj = pool.starmap_async(func, val)
result_obj.wait(5)
if result_obj.ready():
result = result_obj.get(1)
但是,这里的进程仍在后台 运行,无法找到杀死这些进程并在 5 秒后捕获已完成任务的方法。这可能吗?
包含您需要的所有信息:要在生成结果时检索结果,imap_unordered 可能是最好的函数,因为它 returns 结果到主线程一旦完成。您只需要执行一些簿记以确保结果最终位于结果队列中的正确位置。实现这一点的一种方法是将索引传递给并行函数,然后该函数 returns.
下面的一些简化的伪代码,您应该能够通过以下方法得出解决方案:
def worker(id, other_args):
arg1, arg2, ..., argn = other_args # Assuming other_args is a tuple or something else unpackable
do something
return idx, result
obtained_results = [None] * nr_tasks
with multiprocessing.Pool() as pool:
for idx, result in pool.imap_unordered(worker, enumerate(arg_list)):
obtained_results[idx] = result
if time > timeout:
pool.terminate()
break
唯一的小缺点是只有在返回第一个超过超时的结果后才会触发超时。对于短期任务,这应该不是问题。如果您有更长的 运行 任务,您可能需要其他东西,但 pool.terminate()
可能仍然是适合您的方法。
我想 return 在杀死所有正在进行的和排队的任务后,在给定的超时时间内,多处理中已经完成的任务值。
例如,以下函数需要 运行 并行使用 pool.starmap()
对于值,1 到 100。
def func(x):
time.sleep(1)
return x * 2
假设在 5 秒(定义的超时)后,与值 1 到 10 相关的任务已完成,而 11-90 仍在 运行ning 或排队。然后在 5 秒时,我想终止所有进程和 return 已完成任务的值 1 到 10 的值。那么 100 个任务的 return 列表应该是 [2,4, 6,8,10, ....18, 20, None, None,....None]。 我试过的主要功能如下,
def main():
pool = multiprocessing.Pool(processes=3)
val = [[i] for i in range(100)]
result_obj = pool.starmap_async(func, val)
result_obj.wait(5)
if result_obj.ready():
result = result_obj.get(1)
但是,这里的进程仍在后台 运行,无法找到杀死这些进程并在 5 秒后捕获已完成任务的方法。这可能吗?
下面的一些简化的伪代码,您应该能够通过以下方法得出解决方案:
def worker(id, other_args):
arg1, arg2, ..., argn = other_args # Assuming other_args is a tuple or something else unpackable
do something
return idx, result
obtained_results = [None] * nr_tasks
with multiprocessing.Pool() as pool:
for idx, result in pool.imap_unordered(worker, enumerate(arg_list)):
obtained_results[idx] = result
if time > timeout:
pool.terminate()
break
唯一的小缺点是只有在返回第一个超过超时的结果后才会触发超时。对于短期任务,这应该不是问题。如果您有更长的 运行 任务,您可能需要其他东西,但 pool.terminate()
可能仍然是适合您的方法。