Python JoinableQueue 在其他进程中调用 task_done 需要两次

Python JoinableQueue call task_done in other process need twice

我已经实现了一个基于 multiprocessing.ProcessJoinableQueue 的 WorkerManager。当我尝试处理进程异常如超时或在 proc.join(timeout) 后未处理异常,并评估 proc.exitcode 以确定如何处理,然后调用 in_queue.task_done() 通知该工作已通过异常处理逻辑完成。但是它需要调用两次。我不知道为什么它应该被调用两次。有没有人能弄明白这里的原因。

整个代码片段:

# -*- coding=utf-8 -*-

import time
import threading
from queue import Empty
from multiprocessing import Event, Process, JoinableQueue, cpu_count, current_process

TIMEOUT = 3


class WorkersManager(object):

    def __init__(self, jobs, processes_num):
        self._processes_num = processes_num if processes_num else cpu_count()
        self._workers_num = processes_num
        self._in_queue, self._run_queue, self._out_queue = JoinableQueue(), JoinableQueue(), JoinableQueue()
        self._spawned_procs = []
        self._total = 0
        self._stop_event = Event()
        self._jobs_on_procs = {}

        self._wk_kwargs = dict(
            in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue,
            stop_event=self._stop_event
        )

        self._in_stream = [j for j in jobs]
        self._out_stream = []
        self._total = len(self._in_stream)

    def run(self):
        # Spawn Worker
        worker_processes = [
            WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
        ]
        self._spawned_procs = [
            Process(target=process.run, args=tuple())
            for process in worker_processes
        ]

        for p in self._spawned_procs:
            p.start()

        self._serve()

        monitor = threading.Thread(target=self._monitor, args=tuple())
        monitor.start()

        collector = threading.Thread(target=self._collect, args=tuple())
        collector.start()

        self._join_workers()
        # TODO: Terminiate threads
        monitor.join(TIMEOUT)
        collector.join(TIMEOUT)

        self._in_queue.join()
        self._out_queue.join()
        return self._out_stream

    def _join_workers(self):
        for p in self._spawned_procs:
            p.join(TIMEOUT)

            if p.is_alive():
                p.terminate()
                job = self._jobs_on_procs.get(p.name)
                print('Process TIMEOUT: {0} {1}'.format(p.name, job))
                result = {
                    "status": "failed"
                }

                self._out_queue.put(result)
                for _ in range(2):
                    # NOTE: Call task_done twice
                    # Guessing:
                    # 1st time to swtich process?
                    # 2nd time to notify task has done?
                    # TODO: figure it out why?
                    self._in_queue.task_done()
            else:
                if p.exitcode == 0:
                    print("{} exit with code:{}".format(p, p.exitcode))
                else:
                    job = self._jobs_on_procs.get(p.name)
                    if p.exitcode > 0:
                        print("{} with code:{} {}".format(p, p.exitcode, job))
                    else:
                        print("{} been killed with code:{} {}".format(p, p.exitcode, job))

                    result = {
                        "status": "failed"
                    }

                    self._out_queue.put(result)
                    for _ in range(2):
                        # NOTE: Call task_done twice
                        # Guessing:
                        # 1st time to swtich process?
                        # 2nd time to notify task has done?
                        # TODO: figure it out why?
                        self._in_queue.task_done()

    def _collect(self):
        # TODO: Spawn a collector proc
        while True:
            try:
                r = self._out_queue.get()
                self._out_stream.append(r)
                self._out_queue.task_done()

                if len(self._out_stream) >= self._total:
                    print("Total {} jobs done.".format(len(self._out_stream)))
                    self._stop_event.set()
                    break
            except Empty:
                continue

    def _serve(self):
        for job in self._in_stream:
            self._in_queue.put(job)

        for _ in range(self._workers_num):
            self._in_queue.put(None)

    def _monitor(self):
        running = 0
        while True:
            proc_name, job = self._run_queue.get()
            running += 1
            self._jobs_on_procs.update({proc_name: job})
            self._run_queue.task_done()
            if running == self._total:
                break


class WorkerProcess(object):

    def __init__(self, worker_id, in_queue, run_queue, out_queue, stop_event):
        self._worker_id = worker_id
        self._in_queue = in_queue
        self._run_queue = run_queue
        self._out_queue = out_queue
        self._stop_event = stop_event

    def run(self):
        self._work()
        print('worker - {} quit'.format(self._worker_id))

    def _work(self):
        print("worker - {0} start to work".format(self._worker_id))
        job = {}
        while not self._stop_event.is_set():
            try:
                job = self._in_queue.get(timeout=.01)
            except Empty:
                continue

            if not job:
                self._in_queue.task_done()
                break

            try:
                proc = current_process()
                self._run_queue.put((proc.name, job))
                r = self._run_job(job)
                self._out_queue.put(r)
            except Exception as err:
                print('Unhandle exception: {0}'.format(err), exc_info=True)
                result = {"status": 'failed'}
                self._out_queue.put(result)
            finally:
                self._in_queue.task_done()

    def _run_job(self, job):
        time.sleep(job)
        return {
            'status': 'succeed'
        }


def main():

    jobs = [3, 4, 5, 6, 7]
    procs_num = 3
    m = WorkersManager(jobs, procs_num)
    m.run()


if __name__ == "__main__":
    main()

问题代码如下:

   self._out_queue.put(result)
                    for _ in range(2):
                        # ISSUE HERE !!!
                        # NOTE: Call task_done twice
                        # Guessing:
                        # 1st time to swtich process?
                        # 2nd time to notify task has done?
                        # TODO: figure it out why?
                        self._in_queue.task_done()

我需要调用 self._in_queue.task_done() 两次以通知 JoinableQueue 作业已通过异常处理逻辑完成。

我猜想 task_done() 第一次调用是否是为了切换进程上下文?或其他任何东西。根据测试。第二个 task_done() 有效。

worker - 0 start to work
worker - 1 start to work
worker - 2 start to work

Process TIMEOUT: Process-1 5
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.

如果你调用 task_done() 一次,它将永远阻塞,不会完成。

问题是你有一个竞争条件,定义为:

A race condition arises in software when a computer program, to operate properly, depends on the sequence or timing of the program's processes or threads.

在方法 WorkerProcess._work 中,您的主循环开始:

    while not self._stop_event.is_set():
        try:
            job = self._in_queue.get(timeout=.01)
        except Empty:
            continue

        if not job:
            self._in_queue.task_done()
            break

self._stop_event_collect 线程设置。根据发生这种情况时 WorkerProcess._work 在循环中的位置,它可以退出循环,留下已放置在 _in_queue 上的 None 表示不再有作业。显然,对于两个进程,这发生了两次。它甚至可能发生在 0、1、2 或 3 个进程中。

解决方法是将 while not self._stop_event.is_set(): 替换为 while True:,并仅依靠在 _in_queue 上找到 None 来表示终止。这使您能够删除那些已正常完成的进程对 task_done 的额外调用(实际上每个成功完成的进程只需要 一个 额外调用,而不是你拥有的两个) .

但这只是问题的一半。另一半是你的代码:

def _join_workers(self):
    for p in self._spawned_procs:
        p.join(TIMEOUT)
        ...
            p.terminate()

因此,您没有让您的工作人员有足够的时间来耗尽 _in_queue,因此可能会留下 任意 条消息(在你的例子中,当然,只有当前的“工作”正在处理,None 哨兵总共有 2 个)。

但这就是代码的一般问题:它被过度设计了。例如,回头参考上面的第一个代码片段。可以进一步简化为:

    while True:
        job = self._in_queue.get() # blocking get
        if not job:
            break

此外,甚至没有理由使用 JoinableQueueEvent 实例,因为使用放置在 _in_queue 上的 None 哨兵足以表示工作进程应该终止,特别是如果您要过早终止工作进程。简化的工作代码是:

import time
import threading
from multiprocessing import Process, Queue, cpu_count, current_process

TIMEOUT = 3


class WorkersManager(object):

    def __init__(self, jobs, processes_num):
        self._processes_num = processes_num if processes_num else cpu_count()
        self._workers_num = processes_num
        self._in_queue, self._run_queue, self._out_queue = Queue(), Queue(), Queue()
        self._spawned_procs = []
        self._total = 0
        self._jobs_on_procs = {}

        self._wk_kwargs = dict(
            in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue
        )

        self._in_stream = [j for j in jobs]
        self._out_stream = []
        self._total = len(self._in_stream)

    def run(self):
        # Spawn Worker
        worker_processes = [
            WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
        ]
        self._spawned_procs = [
            Process(target=process.run, args=tuple())
            for process in worker_processes
        ]

        for p in self._spawned_procs:
            p.start()

        self._serve()

        monitor = threading.Thread(target=self._monitor, args=tuple())
        monitor.start()

        collector = threading.Thread(target=self._collect, args=tuple())
        collector.start()

        self._join_workers()
        # TODO: Terminiate threads
        monitor.join()
        collector.join()

        return self._out_stream

    def _join_workers(self):
        for p in self._spawned_procs:
            p.join(TIMEOUT)

            if p.is_alive():
                p.terminate()
                job = self._jobs_on_procs.get(p.name)
                print('Process TIMEOUT: {0} {1}'.format(p.name, job))
                result = {
                    "status": "failed"
                }

                self._out_queue.put(result)
            else:
                if p.exitcode == 0:
                    print("{} exit with code:{}".format(p, p.exitcode))
                else:
                    job = self._jobs_on_procs.get(p.name)
                    if p.exitcode > 0:
                        print("{} with code:{} {}".format(p, p.exitcode, job))
                    else:
                        print("{} been killed with code:{} {}".format(p, p.exitcode, job))

                    result = {
                        "status": "failed"
                    }

                    self._out_queue.put(result)

    def _collect(self):
        # TODO: Spawn a collector proc
        while True:
            r = self._out_queue.get()
            self._out_stream.append(r)
            if len(self._out_stream) >= self._total:
                print("Total {} jobs done.".format(len(self._out_stream)))
                break

    def _serve(self):
        for job in self._in_stream:
            self._in_queue.put(job)

        for _ in range(self._workers_num):
            self._in_queue.put(None)

    def _monitor(self):
        running = 0
        while True:
            proc_name, job = self._run_queue.get()
            running += 1
            self._jobs_on_procs.update({proc_name: job})
            if running == self._total:
                break


class WorkerProcess(object):

    def __init__(self, worker_id, in_queue, run_queue, out_queue):
        self._worker_id = worker_id
        self._in_queue = in_queue
        self._run_queue = run_queue
        self._out_queue = out_queue

    def run(self):
        self._work()
        print('worker - {} quit'.format(self._worker_id))

    def _work(self):
        print("worker - {0} start to work".format(self._worker_id))
        job = {}
        while True:
            job = self._in_queue.get()
            if not job:
                break

            try:
                proc = current_process()
                self._run_queue.put((proc.name, job))
                r = self._run_job(job)
                self._out_queue.put(r)
            except Exception as err:
                print('Unhandle exception: {0}'.format(err), exc_info=True)
                result = {"status": 'failed'}
                self._out_queue.put(result)

    def _run_job(self, job):
        time.sleep(job)
        return {
            'status': 'succeed'
        }


def main():

    jobs = [3, 4, 5, 6, 7]
    procs_num = 3
    m = WorkersManager(jobs, procs_num)
    m.run()


if __name__ == "__main__":
    main()

打印:

worker - 0 start to work
worker - 1 start to work
worker - 2 start to work
Process TIMEOUT: Process-1 3
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.

你可能已经意识到了这一点,但尽职调查需要我提到有两个优秀的 类 multiprocessing.Poolconcurrent.futures.ProcessPoolExecutor 来做你想完成的事情。请参阅 this 进行比较。

进一步说明

使用支持调用 task_doneJoinableQueue 有什么意义? 通常,这样您就可以确定您放在队列中的所有消息都已从队列中取出并进行处理,并且主进程不会在此之前过早终止那已经发生了。但这在代码中无法像您拥有的那样正常工作,因为您只给进程 TIMEOUT 秒来处理其消息,然后如果进程仍然存在则终止该进程,并且消息可能仍留在其上队列。这就是迫使您人为地发出对 task_done 的额外调用的原因,这样您对主进程中队列上的 join 的调用就不会挂起,以及为什么您必须 post 这个问题才能开始与.

因此,您可以通过两种不同的方式进行操作。一种方法允许您继续使用 JoinableQueue 个实例并在这些实例上调用 join 以了解何时终止。但是 (1) 您将无法提前终止您的消息进程,并且 (2) 您的消息进程必须正确处理异常,以便它们不会在未清空队列的情况下提前终止。

另一种方式是我提出的,比较简单。主进程只是在输入队列中放置一个特殊的 sentinel 消息,在本例中为 None。这只是一条消息,不能被误认为是要处理的实际消息,而是表示 文件结束 ,或者换句话说,向消息进程发出信号,表明没有更多消息将被放置在队列中,它现在可能会终止。因此,除了要在队列上处理的“真实”消息之外,主进程还必须放置额外的哨兵消息,然后而不是对消息队列进行 join 调用(现在只是常规的,不可连接的队列),它在每个流程实例上执行join(TIMEOUT),你会发现它不再存在,因为它已经看到哨兵和因此你知道它已经处理了所有它的消息 或者如果你愿意在它的输入队列中留下消息,你可以在进程上调用 terminate

当然,要真正确保自己终止的进程确实清空了队列,可能需要您检查它们的队列以查看它们确实是空的。但我假设您应该能够编写您的流程以正确处理异常,至少是那些可以处理的异常,这样它们就不会过早终止并对每条消息做一些“合理”的事情。