我如何判断 AsyncResult 是否永远不会准备好?
How can I tell if an AsyncResult will never be ready?
场景 - 我有一个正在分配任务的进程池。但是,如果子进程在 运行 任务时被终止,则 AsyncResult
对象将永远不会被标记为就绪。我希望发生的是它会被标记为准备就绪但不成功。
要重现此内容:
>>> import multiprocessing
>>> import time
>>> p = multiprocessing.Pool(processes=1)
>>> result = p.apply_async(time.sleep, args=(1000,))
>>> result.ready()
False
在另一个 shell 中,找到进程 ID 并杀死它。
>>> result.ready()
False
>>> result.wait(5) # Waits 5 seconds even though subprocess is dead
这是一个问题,因为我有一个线程正在等待作业完成,而且它通常有相当长的超时时间。如何在无需等待超时的情况下完成 result.wait(timeout)
调用?另外,我怎么知道它已被放弃,而不仅仅是任务仍在 运行 但我们达到了超时?
Pebble 库会在进程意外终止时通知您。它还支持超时和回调。
这是你的例子。
from pebble import ProcessPool
from concurrent.futures import TimeoutError
with ProcessPool() as pool:
future = pool.schedule(time.sleep, args=(1000,), timeout=100)
try:
results = future.result()
print(results)
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
except ProcessExpired as error:
print("%s. Exit code: %d" % (error, error.exitcode))
except Exception as error:
print("function raised %s" % error)
print(error.traceback) # Python's traceback of remote process
documentation 中有更多示例。
调用 result.wait()
将等到它到达 timeout
,除非它从池中收到信号。但是,如果您这样做 kill -9 [pid]
,那么池将立即启动队列中的下一个作业。
因此,使用它更容易,然后手动 "poll" 并检查 ready()
。正如您所说的那样,问题是 ready()
在作业被终止时仍然 False
。
要解决此问题,您可以检查 pid 是否处于活动状态。由于 ApplyResult
不携带 pid,您需要通过其他方式获取它。你可以这样做:
def test(identifier):
pid = os.getpid()
f = open("pids/" + str(pid), "w")
f.write(str(identifier))
f.close()
# do stuff
time.sleep(1000)
然后像这样创建工作(考虑jobs = []
)。
job = (identifier, pool.apply_async(test, (identifier,)))
jobs.append(job)
标识符不是必需的,但如果您稍后想弄清楚哪个 ApplyResult
属于哪个 pid,则该标识符很有用。
然后您可以获取所有作业并检查每个作业 (pid) 是否存在:
def is_alive(pid):
return os.path.exists("/proc/" + str(pid))
for pid in os.listdir("pids"):
if is_alive(pid):
...
else:
...
考虑每个 pid 命名文件的内容。然后使用在 jobs
中找到的 identifier
,您现在可以 link 向上 ApplyResult
属于哪个 pid 并具体检查哪个作业已死或 ready()
或者如果以上none那么还是运行.
您还可以创建一个管道并派生一个子进程。
r, w = os.pipe()
def child():
global r, w
data = ...
time.sleep(100)
os.close(r)
w = os.fdopen(w, "w")
w.write(data)
w.close()
然后您只需将数据写回您的父进程。
def parent(child_pid):
global r, w
os.close(w)
r = os.fdopen(r)
data = r.read()
r.close()
status = os.waitpid(child_pid, 0)
if status == 0:
# Everything is fine
elif status == 9:
# kill -9 [pid]
# Process data
然后您可以利用收到的 status
和 data
来决定子进程发生了什么。
一切从实践开始。
if __name__ == "__main__":
child_pid = os.fork()
if child_pid:
parent(child_pid)
else:
child()
根据你的问题,我假设是 Unix。如果没有,请随时纠正我。另外,如果有任何非Python 2.7 偷偷进入答案,那么我深表歉意。
场景 - 我有一个正在分配任务的进程池。但是,如果子进程在 运行 任务时被终止,则 AsyncResult
对象将永远不会被标记为就绪。我希望发生的是它会被标记为准备就绪但不成功。
要重现此内容:
>>> import multiprocessing
>>> import time
>>> p = multiprocessing.Pool(processes=1)
>>> result = p.apply_async(time.sleep, args=(1000,))
>>> result.ready()
False
在另一个 shell 中,找到进程 ID 并杀死它。
>>> result.ready()
False
>>> result.wait(5) # Waits 5 seconds even though subprocess is dead
这是一个问题,因为我有一个线程正在等待作业完成,而且它通常有相当长的超时时间。如何在无需等待超时的情况下完成 result.wait(timeout)
调用?另外,我怎么知道它已被放弃,而不仅仅是任务仍在 运行 但我们达到了超时?
Pebble 库会在进程意外终止时通知您。它还支持超时和回调。
这是你的例子。
from pebble import ProcessPool
from concurrent.futures import TimeoutError
with ProcessPool() as pool:
future = pool.schedule(time.sleep, args=(1000,), timeout=100)
try:
results = future.result()
print(results)
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
except ProcessExpired as error:
print("%s. Exit code: %d" % (error, error.exitcode))
except Exception as error:
print("function raised %s" % error)
print(error.traceback) # Python's traceback of remote process
documentation 中有更多示例。
调用 result.wait()
将等到它到达 timeout
,除非它从池中收到信号。但是,如果您这样做 kill -9 [pid]
,那么池将立即启动队列中的下一个作业。
因此,使用它更容易,然后手动 "poll" 并检查 ready()
。正如您所说的那样,问题是 ready()
在作业被终止时仍然 False
。
要解决此问题,您可以检查 pid 是否处于活动状态。由于 ApplyResult
不携带 pid,您需要通过其他方式获取它。你可以这样做:
def test(identifier):
pid = os.getpid()
f = open("pids/" + str(pid), "w")
f.write(str(identifier))
f.close()
# do stuff
time.sleep(1000)
然后像这样创建工作(考虑jobs = []
)。
job = (identifier, pool.apply_async(test, (identifier,)))
jobs.append(job)
标识符不是必需的,但如果您稍后想弄清楚哪个 ApplyResult
属于哪个 pid,则该标识符很有用。
然后您可以获取所有作业并检查每个作业 (pid) 是否存在:
def is_alive(pid):
return os.path.exists("/proc/" + str(pid))
for pid in os.listdir("pids"):
if is_alive(pid):
...
else:
...
考虑每个 pid 命名文件的内容。然后使用在 jobs
中找到的 identifier
,您现在可以 link 向上 ApplyResult
属于哪个 pid 并具体检查哪个作业已死或 ready()
或者如果以上none那么还是运行.
您还可以创建一个管道并派生一个子进程。
r, w = os.pipe()
def child():
global r, w
data = ...
time.sleep(100)
os.close(r)
w = os.fdopen(w, "w")
w.write(data)
w.close()
然后您只需将数据写回您的父进程。
def parent(child_pid):
global r, w
os.close(w)
r = os.fdopen(r)
data = r.read()
r.close()
status = os.waitpid(child_pid, 0)
if status == 0:
# Everything is fine
elif status == 9:
# kill -9 [pid]
# Process data
然后您可以利用收到的 status
和 data
来决定子进程发生了什么。
一切从实践开始。
if __name__ == "__main__":
child_pid = os.fork()
if child_pid:
parent(child_pid)
else:
child()
根据你的问题,我假设是 Unix。如果没有,请随时纠正我。另外,如果有任何非Python 2.7 偷偷进入答案,那么我深表歉意。