python 多处理池超时
python multiprocessing pool timeout
我想用multiprocessing.Pool, but multiprocessing.Pool can't abort a task after a timeout. I found ,稍微修改一下
from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time
def worker(y):
print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
start = time.time()
while True:
if time.time() - start >= y:
break
time.sleep(0.5)
# show work progress
print(y)
return y
def collect_my_result(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
# Wait timeout seconds for func to complete.
out = res.get(timeout)
except TimeoutError:
print("Aborting due to timeout {}".format(args[1]))
# kill worker itself when get TimeoutError
sys.exit(1)
else:
return out
def empty_func():
pass
if __name__ == "__main__":
TIMEOUT = 4
util.log_to_stderr(util.DEBUG)
pool = Pool(processes=4)
# k - time to job sleep
featureClass = [(k,) for k in range(20, 0, -1)] # list of arguments
for f in featureClass:
# check available worker
pool.apply(empty_func)
# run job with timeout
abortable_func = partial(abortable_worker, worker, timeout=TIMEOUT)
pool.apply_async(abortable_func, args=f, callback=collect_my_result)
time.sleep(TIMEOUT)
pool.terminate()
print("exit")
主要修改 - 工作进程以 sys.exit(1) 退出。它是杀死工作进程和杀死工作线程,但我不确定这个解决方案是否好。当进程以 运行 作业终止时,我会遇到什么潜在问题?
停止 运行 作业没有隐含风险,OS 将负责正确终止进程。
如果您的工作是写入文件,您的磁盘上可能会出现大量被截断的文件。
如果您在 DB 上写入或连接到某个远程进程,也可能会出现一些小问题。
然而,Python 标准池不支持在任务超时时终止 worker。突然终止进程可能会导致您的应用程序出现奇怪的行为。
Pebble处理池支持超时任务
from pebble import process, TimeoutError
with process.Pool() as pool:
task = pool.schedule(function, args=[1,2], timeout=5)
try:
result = task.get()
except TimeoutError:
print "Task: %s took more than 5 seconds to complete" % task
我想用multiprocessing.Pool, but multiprocessing.Pool can't abort a task after a timeout. I found
from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time
def worker(y):
print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
start = time.time()
while True:
if time.time() - start >= y:
break
time.sleep(0.5)
# show work progress
print(y)
return y
def collect_my_result(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
# Wait timeout seconds for func to complete.
out = res.get(timeout)
except TimeoutError:
print("Aborting due to timeout {}".format(args[1]))
# kill worker itself when get TimeoutError
sys.exit(1)
else:
return out
def empty_func():
pass
if __name__ == "__main__":
TIMEOUT = 4
util.log_to_stderr(util.DEBUG)
pool = Pool(processes=4)
# k - time to job sleep
featureClass = [(k,) for k in range(20, 0, -1)] # list of arguments
for f in featureClass:
# check available worker
pool.apply(empty_func)
# run job with timeout
abortable_func = partial(abortable_worker, worker, timeout=TIMEOUT)
pool.apply_async(abortable_func, args=f, callback=collect_my_result)
time.sleep(TIMEOUT)
pool.terminate()
print("exit")
主要修改 - 工作进程以 sys.exit(1) 退出。它是杀死工作进程和杀死工作线程,但我不确定这个解决方案是否好。当进程以 运行 作业终止时,我会遇到什么潜在问题?
停止 运行 作业没有隐含风险,OS 将负责正确终止进程。
如果您的工作是写入文件,您的磁盘上可能会出现大量被截断的文件。
如果您在 DB 上写入或连接到某个远程进程,也可能会出现一些小问题。
然而,Python 标准池不支持在任务超时时终止 worker。突然终止进程可能会导致您的应用程序出现奇怪的行为。
Pebble处理池支持超时任务
from pebble import process, TimeoutError
with process.Pool() as pool:
task = pool.schedule(function, args=[1,2], timeout=5)
try:
result = task.get()
except TimeoutError:
print "Task: %s took more than 5 seconds to complete" % task