我如何判断 AsyncResult 是否永远不会准备好?

How can I tell if an AsyncResult will never be ready?

场景 - 我有一个正在分配任务的进程池。但是,如果子进程在 运行 任务时被终止,则 AsyncResult 对象将永远不会被标记为就绪。我希望发生的是它会被标记为准备就绪但不成功。

要重现此内容:

>>> import multiprocessing
>>> import time
>>> p = multiprocessing.Pool(processes=1)
>>> result = p.apply_async(time.sleep, args=(1000,))
>>> result.ready()
False

在另一个 shell 中,找到进程 ID 并杀死它。

>>> result.ready()
False
>>> result.wait(5) # Waits 5 seconds even though subprocess is dead

这是一个问题,因为我有一个线程正在等待作业完成,而且它通常有相当长的超时时间。如何在无需等待超时的情况下完成 result.wait(timeout) 调用?另外,我怎么知道它已被放弃,而不仅仅是任务仍在 运行 但我们达到了超时?

Pebble 库会在进程意外终止时通知您。它还支持超时和回调。

这是你的例子。

from pebble import ProcessPool
from concurrent.futures import TimeoutError

with ProcessPool() as pool:
    future = pool.schedule(time.sleep, args=(1000,), timeout=100)

    try:
        results = future.result()
        print(results)
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
    except ProcessExpired as error:
        print("%s. Exit code: %d" % (error, error.exitcode))
    except Exception as error:
        print("function raised %s" % error)
        print(error.traceback)  # Python's traceback of remote process

documentation 中有更多示例。

调用 result.wait() 将等到它到达 timeout,除非它从池中收到信号。但是,如果您这样做 kill -9 [pid],那么池将立即启动队列中的下一个作业。

因此,使用它更容易,然后手动 "poll" 并检查 ready()。正如您所说的那样,问题是 ready() 在作业被终止时仍然 False

要解决此问题,您可以检查 pid 是否处于活动状态。由于 ApplyResult 不携带 pid,您需要通过其他方式获取它。你可以这样做:

def test(identifier):
    pid = os.getpid()

    f = open("pids/" + str(pid), "w")
    f.write(str(identifier))
    f.close()

    # do stuff
    time.sleep(1000)

然后像这样创建工作(考虑jobs = [])。

job = (identifier, pool.apply_async(test, (identifier,)))
jobs.append(job)

标识符不是必需的,但如果您稍后想弄清楚哪个 ApplyResult 属于哪个 pid,则该标识符很有用。

然后您可以获取所有作业并检查每个作业 (pid) 是否存在:

def is_alive(pid):
    return os.path.exists("/proc/" + str(pid))

for pid in os.listdir("pids"):
    if is_alive(pid):
        ...
    else:
        ...

考虑每个 pid 命名文件的内容。然后使用在 jobs 中找到的 identifier,您现在可以 link 向上 ApplyResult 属于哪个 pid 并具体检查哪个作业已死或 ready() 或者如果以上none那么还是运行.


您还可以创建一个管道并派生一个子进程。

r, w = os.pipe()

def child():
    global r, w

    data = ...
    time.sleep(100)

    os.close(r)
    w = os.fdopen(w, "w")
    w.write(data)
    w.close()

然后您只需将数据写回您的父进程。

def parent(child_pid):
    global r, w

    os.close(w)

    r = os.fdopen(r)
    data = r.read()
    r.close()

    status = os.waitpid(child_pid, 0)
    if status == 0:
        # Everything is fine
    elif status == 9:
        # kill -9 [pid]

    # Process data

然后您可以利用收到的 statusdata 来决定子进程发生了什么。

一切从实践开始。

if __name__ == "__main__":
    child_pid = os.fork()
    if child_pid:
        parent(child_pid)
    else:
        child()

根据你的问题,我假设是 Unix。如果没有,请随时纠正我。另外,如果有任何非Python 2.7 偷偷进入答案,那么我深表歉意。