Python 多处理队列慢

Python Multiprocessing Queue Slow

我对 python 多处理队列有疑问。 我正在对一些数据进行一些艰苦的计算。我创建了几个进程来减少计算时间,数据在发送到进程之前也被平均分割。它很好地减少了计算时间,但是当我想通过 multiprocessing.Queue 从进程中 return 数据时,它需要很长时间,而且整个过程比在主线程中计算要慢。

    processes = []
    proc = 8
    for i in range(proc):
           processes.append(multiprocessing.Process(target=self.calculateTriangles, args=(inData[i],outData,timer)))
    for p in processes:
        p.start()
    results = []
    for i in range(proc):
        results.append(outData.get())
    print("killing threads")
    print(datetime.datetime.now() - timer)
    for p in processes:
        p.join()
    print("Finish Threads")
    print(datetime.datetime.now() - timer)

所有线程在完成时打印它们的完成时间。这是此代码的示例输出

0:00:00.017873 CalcDone    
0:00:01.692940 CalcDone
0:00:01.777674 CalcDone
0:00:01.780019 CalcDone
0:00:01.796739 CalcDone
0:00:01.831723 CalcDone
0:00:01.842356 CalcDone
0:00:01.868633 CalcDone
0:00:05.497160 killing threads
60968 calculated triangles 

如您所见,在此代码之前一切都很简单。

    for i in range(proc):
        results.append(outData.get())
    print("killing threads")
    print(datetime.datetime.now() - timer)

以下是我在我的电脑和较慢的电脑上所做的一些观察。 https://docs.google.com/spreadsheets/d/1_8LovX0eSgvNW63-xh8L9-uylAVlzY4VSPUQ1yP2F9A/edit?usp=sharing。如您所见,在较慢的版本上没有任何改进。

为什么在处理完成后从队列中获取项目需要这么长时间??有什么办法可以加快速度吗?

所以我自己解决了。计算速度很快,但将对象从一个进程复制到另一个进程需要很长时间。我刚刚创建了一个清除对象中所有不必要字段的方法,同时使用 pipes 比多处理 queues 更快。它将我较慢的计算机上的时间从 29 秒减少到 15 秒。

这段时间主要花在将另一个对象放入队列和增加信号量计数上。如果你能够一次批量插入所有数据的队列,那么你减少到以前的 1/10。

我在旧方法的基础上动态地为队列分配了一个新方法。转到您的 Python 版本的多处理模块:

/usr/lib/pythonx.x/multiprocessing.queues.py

将 class 的 "put" 方法复制到您的项目中,例如对于 Python 3.7:

def put(self, obj, block=True, timeout=None):
    assert not self._closed, "Queue {0!r} has been closed".format(self)
    if not self._sem.acquire(block, timeout):
        raise Full

    with self._notempty:
        if self._thread is None:
            self._start_thread()
        self._buffer.append(obj)
        self._notempty.notify()

修改:

def put_bla(self, obj, block=True, timeout=None):
    assert not self._closed, "Queue {0!r} has been closed".format(self)

    for el in obj:
        if not self._sem.acquire(block, timeout):  #spike the semaphore count
            raise Full
        with self._notempty:
            if self._thread is None:
                self._start_thread()
            self._buffer += el  # adding a collections.deque object
            self._notempty.notify()

最后一步是将新方法添加到 class。 multiprocessing.Queue 是一个 DefaultContext 方法,returns 是一个 Queue 对象。将方法直接注入创建对象的class更容易。所以:

from collections import deque

queue = Queue()
queue.__class__.put_bulk = put_bla  # injecting new method
items = (500, 400, 450, 350) * count  # (500, 400, 450, 350, 500, 400...)
queue.put_bulk(deque(items))

不幸的是,multiprocessing.Pool 总是快 10%,所以如果您不需要长期工作人员来处理您的任务,请坚持使用它。它基于 multiprocessing.SimpleQueue,而 multiprocessing.SimpleQueue 基于 multiprocessing.Pipe,我不知道为什么它更快,因为我的 SimpleQueue 解决方案不是,而且它不是批量注入的:)打破它,你就会有有史以来最快的工人:)