Python 多处理队列慢
Python Multiprocessing Queue Slow
我对 python 多处理队列有疑问。
我正在对一些数据进行一些艰苦的计算。我创建了几个进程来减少计算时间,数据在发送到进程之前也被平均分割。它很好地减少了计算时间,但是当我想通过 multiprocessing.Queue 从进程中 return 数据时,它需要很长时间,而且整个过程比在主线程中计算要慢。
processes = []
proc = 8
for i in range(proc):
processes.append(multiprocessing.Process(target=self.calculateTriangles, args=(inData[i],outData,timer)))
for p in processes:
p.start()
results = []
for i in range(proc):
results.append(outData.get())
print("killing threads")
print(datetime.datetime.now() - timer)
for p in processes:
p.join()
print("Finish Threads")
print(datetime.datetime.now() - timer)
所有线程在完成时打印它们的完成时间。这是此代码的示例输出
0:00:00.017873 CalcDone
0:00:01.692940 CalcDone
0:00:01.777674 CalcDone
0:00:01.780019 CalcDone
0:00:01.796739 CalcDone
0:00:01.831723 CalcDone
0:00:01.842356 CalcDone
0:00:01.868633 CalcDone
0:00:05.497160 killing threads
60968 calculated triangles
如您所见,在此代码之前一切都很简单。
for i in range(proc):
results.append(outData.get())
print("killing threads")
print(datetime.datetime.now() - timer)
以下是我在我的电脑和较慢的电脑上所做的一些观察。
https://docs.google.com/spreadsheets/d/1_8LovX0eSgvNW63-xh8L9-uylAVlzY4VSPUQ1yP2F9A/edit?usp=sharing。如您所见,在较慢的版本上没有任何改进。
为什么在处理完成后从队列中获取项目需要这么长时间??有什么办法可以加快速度吗?
所以我自己解决了。计算速度很快,但将对象从一个进程复制到另一个进程需要很长时间。我刚刚创建了一个清除对象中所有不必要字段的方法,同时使用 pipes
比多处理 queues
更快。它将我较慢的计算机上的时间从 29 秒减少到 15 秒。
这段时间主要花在将另一个对象放入队列和增加信号量计数上。如果你能够一次批量插入所有数据的队列,那么你减少到以前的 1/10。
我在旧方法的基础上动态地为队列分配了一个新方法。转到您的 Python 版本的多处理模块:
/usr/lib/pythonx.x/multiprocessing.queues.py
将 class 的 "put" 方法复制到您的项目中,例如对于 Python 3.7:
def put(self, obj, block=True, timeout=None):
assert not self._closed, "Queue {0!r} has been closed".format(self)
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
修改:
def put_bla(self, obj, block=True, timeout=None):
assert not self._closed, "Queue {0!r} has been closed".format(self)
for el in obj:
if not self._sem.acquire(block, timeout): #spike the semaphore count
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer += el # adding a collections.deque object
self._notempty.notify()
最后一步是将新方法添加到 class。 multiprocessing.Queue 是一个 DefaultContext 方法,returns 是一个 Queue 对象。将方法直接注入创建对象的class更容易。所以:
from collections import deque
queue = Queue()
queue.__class__.put_bulk = put_bla # injecting new method
items = (500, 400, 450, 350) * count # (500, 400, 450, 350, 500, 400...)
queue.put_bulk(deque(items))
不幸的是,multiprocessing.Pool 总是快 10%,所以如果您不需要长期工作人员来处理您的任务,请坚持使用它。它基于 multiprocessing.SimpleQueue,而 multiprocessing.SimpleQueue 基于 multiprocessing.Pipe,我不知道为什么它更快,因为我的 SimpleQueue 解决方案不是,而且它不是批量注入的:)打破它,你就会有有史以来最快的工人:)
我对 python 多处理队列有疑问。 我正在对一些数据进行一些艰苦的计算。我创建了几个进程来减少计算时间,数据在发送到进程之前也被平均分割。它很好地减少了计算时间,但是当我想通过 multiprocessing.Queue 从进程中 return 数据时,它需要很长时间,而且整个过程比在主线程中计算要慢。
processes = []
proc = 8
for i in range(proc):
processes.append(multiprocessing.Process(target=self.calculateTriangles, args=(inData[i],outData,timer)))
for p in processes:
p.start()
results = []
for i in range(proc):
results.append(outData.get())
print("killing threads")
print(datetime.datetime.now() - timer)
for p in processes:
p.join()
print("Finish Threads")
print(datetime.datetime.now() - timer)
所有线程在完成时打印它们的完成时间。这是此代码的示例输出
0:00:00.017873 CalcDone
0:00:01.692940 CalcDone
0:00:01.777674 CalcDone
0:00:01.780019 CalcDone
0:00:01.796739 CalcDone
0:00:01.831723 CalcDone
0:00:01.842356 CalcDone
0:00:01.868633 CalcDone
0:00:05.497160 killing threads
60968 calculated triangles
如您所见,在此代码之前一切都很简单。
for i in range(proc):
results.append(outData.get())
print("killing threads")
print(datetime.datetime.now() - timer)
以下是我在我的电脑和较慢的电脑上所做的一些观察。 https://docs.google.com/spreadsheets/d/1_8LovX0eSgvNW63-xh8L9-uylAVlzY4VSPUQ1yP2F9A/edit?usp=sharing。如您所见,在较慢的版本上没有任何改进。
为什么在处理完成后从队列中获取项目需要这么长时间??有什么办法可以加快速度吗?
所以我自己解决了。计算速度很快,但将对象从一个进程复制到另一个进程需要很长时间。我刚刚创建了一个清除对象中所有不必要字段的方法,同时使用 pipes
比多处理 queues
更快。它将我较慢的计算机上的时间从 29 秒减少到 15 秒。
这段时间主要花在将另一个对象放入队列和增加信号量计数上。如果你能够一次批量插入所有数据的队列,那么你减少到以前的 1/10。
我在旧方法的基础上动态地为队列分配了一个新方法。转到您的 Python 版本的多处理模块:
/usr/lib/pythonx.x/multiprocessing.queues.py
将 class 的 "put" 方法复制到您的项目中,例如对于 Python 3.7:
def put(self, obj, block=True, timeout=None):
assert not self._closed, "Queue {0!r} has been closed".format(self)
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
修改:
def put_bla(self, obj, block=True, timeout=None):
assert not self._closed, "Queue {0!r} has been closed".format(self)
for el in obj:
if not self._sem.acquire(block, timeout): #spike the semaphore count
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer += el # adding a collections.deque object
self._notempty.notify()
最后一步是将新方法添加到 class。 multiprocessing.Queue 是一个 DefaultContext 方法,returns 是一个 Queue 对象。将方法直接注入创建对象的class更容易。所以:
from collections import deque
queue = Queue()
queue.__class__.put_bulk = put_bla # injecting new method
items = (500, 400, 450, 350) * count # (500, 400, 450, 350, 500, 400...)
queue.put_bulk(deque(items))
不幸的是,multiprocessing.Pool 总是快 10%,所以如果您不需要长期工作人员来处理您的任务,请坚持使用它。它基于 multiprocessing.SimpleQueue,而 multiprocessing.SimpleQueue 基于 multiprocessing.Pipe,我不知道为什么它更快,因为我的 SimpleQueue 解决方案不是,而且它不是批量注入的:)打破它,你就会有有史以来最快的工人:)