试图让两个子进程分担处理同一资源的负载时出现问题
Problem trying to make two child processes share the load of processing the same resource
我正在摆弄 python 多处理模块。但是有些东西并没有像我期望的那样工作,所以现在我有点困惑。
在 python 脚本中,我创建了两个子进程,因此它们可以使用相同的资源。我在想他们会或多或少地平等地 "share" 负载,但似乎,而不是这样做,一个进程只执行一次,而另一个进程几乎处理所有内容。
为了测试它,我写了下面的代码:
#!/usr/bin/python
import os
import multiprocessing
# Worker function
def worker(queueA, queueB):
while(queueA.qsize() != 0):
item = queueA.get()
item = "item: " + item + ". processed by worker " + str(os.getpid())
queueB.put(item)
return
# IPC Manager
manager = multiprocessing.Manager()
queueA = multiprocessing.Queue()
queueB = multiprocessing.Queue()
# Fill queueA with data
for i in range(0, 10):
queueA.put("hello" + str(i+1))
# Create processes
process1 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
process2 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
# Call processes
process1.start()
process2.start()
# Wait for processes to stop processing
process1.join()
process2.join()
for i in range(0, queueB.qsize()):
print queueB.get()
然后打印出以下内容:
item: hello1. processed by worker 11483
item: hello3. processed by worker 11483
item: hello4. processed by worker 11483
item: hello5. processed by worker 11483
item: hello6. processed by worker 11483
item: hello7. processed by worker 11483
item: hello8. processed by worker 11483
item: hello9. processed by worker 11483
item: hello10. processed by worker 11483
item: hello2. processed by worker 11482
如您所见,其中一个进程只处理其中一个元素,它不会继续获取队列中的更多元素,而另一个进程必须处理其他所有元素。
我认为这是不正确的,或者至少不是我所期望的。你能告诉我实现这个想法的正确方法是什么吗?
你说得对,它们不会完全相等,但这主要是因为你的测试样本太小了。每个进程启动和开始处理都需要时间。处理队列中的一项所需的时间极短,因此一个人可以在另一个人通过之前快速处理 9 个项目。
我在下面测试了这个(在 Python3 中,但它也应该适用于 2.7,只需将 print()
函数更改为 print
语句即可):
import os
import multiprocessing
# Worker function
def worker(queueA, queueB):
for item in iter(queueA.get, 'STOP'):
out = str(os.getpid())
queueB.put(out)
return
# IPC Manager
manager = multiprocessing.Manager()
queueA = multiprocessing.Queue()
queueB = multiprocessing.Queue()
# Fill queueA with data
for i in range(0, 1000):
queueA.put("hello" + str(i+1))
# Create processes
process1 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
process2 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
# Call processes
process1.start()
process2.start()
queueA.put('STOP')
queueA.put('STOP')
# Wait for processes to stop processing
process1.join()
process2.join()
all = {}
for i in range(1000):
item = queueB.get()
if item not in all:
all[item] = 1
else:
all[item] += 1
print(all)
我的输出(每个过程完成了多少):
{'18376': 537,
'18377': 463}
虽然它们并不完全相同,但随着我们接近更长的时间,它们将接近相等。
编辑:
确认这一点的另一种方法是在 worker 函数
中添加一个 time.sleep(3)
def worker(queueA, queueB):
for item in iter(queueA.get, 'STOP'):
time.sleep(3)
out = str(os.getpid())
queueB.put(out)
return
我 运行 一个 range(10)
测试就像你原来的例子,得到:
{'18428': 5,
'18429': 5}
我正在摆弄 python 多处理模块。但是有些东西并没有像我期望的那样工作,所以现在我有点困惑。
在 python 脚本中,我创建了两个子进程,因此它们可以使用相同的资源。我在想他们会或多或少地平等地 "share" 负载,但似乎,而不是这样做,一个进程只执行一次,而另一个进程几乎处理所有内容。
为了测试它,我写了下面的代码:
#!/usr/bin/python
import os
import multiprocessing
# Worker function
def worker(queueA, queueB):
while(queueA.qsize() != 0):
item = queueA.get()
item = "item: " + item + ". processed by worker " + str(os.getpid())
queueB.put(item)
return
# IPC Manager
manager = multiprocessing.Manager()
queueA = multiprocessing.Queue()
queueB = multiprocessing.Queue()
# Fill queueA with data
for i in range(0, 10):
queueA.put("hello" + str(i+1))
# Create processes
process1 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
process2 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
# Call processes
process1.start()
process2.start()
# Wait for processes to stop processing
process1.join()
process2.join()
for i in range(0, queueB.qsize()):
print queueB.get()
然后打印出以下内容:
item: hello1. processed by worker 11483
item: hello3. processed by worker 11483
item: hello4. processed by worker 11483
item: hello5. processed by worker 11483
item: hello6. processed by worker 11483
item: hello7. processed by worker 11483
item: hello8. processed by worker 11483
item: hello9. processed by worker 11483
item: hello10. processed by worker 11483
item: hello2. processed by worker 11482
如您所见,其中一个进程只处理其中一个元素,它不会继续获取队列中的更多元素,而另一个进程必须处理其他所有元素。
我认为这是不正确的,或者至少不是我所期望的。你能告诉我实现这个想法的正确方法是什么吗?
你说得对,它们不会完全相等,但这主要是因为你的测试样本太小了。每个进程启动和开始处理都需要时间。处理队列中的一项所需的时间极短,因此一个人可以在另一个人通过之前快速处理 9 个项目。
我在下面测试了这个(在 Python3 中,但它也应该适用于 2.7,只需将 print()
函数更改为 print
语句即可):
import os
import multiprocessing
# Worker function
def worker(queueA, queueB):
for item in iter(queueA.get, 'STOP'):
out = str(os.getpid())
queueB.put(out)
return
# IPC Manager
manager = multiprocessing.Manager()
queueA = multiprocessing.Queue()
queueB = multiprocessing.Queue()
# Fill queueA with data
for i in range(0, 1000):
queueA.put("hello" + str(i+1))
# Create processes
process1 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
process2 = multiprocessing.Process(target = worker, args = (queueA, queueB,))
# Call processes
process1.start()
process2.start()
queueA.put('STOP')
queueA.put('STOP')
# Wait for processes to stop processing
process1.join()
process2.join()
all = {}
for i in range(1000):
item = queueB.get()
if item not in all:
all[item] = 1
else:
all[item] += 1
print(all)
我的输出(每个过程完成了多少):
{'18376': 537,
'18377': 463}
虽然它们并不完全相同,但随着我们接近更长的时间,它们将接近相等。
编辑:
确认这一点的另一种方法是在 worker 函数
time.sleep(3)
def worker(queueA, queueB):
for item in iter(queueA.get, 'STOP'):
time.sleep(3)
out = str(os.getpid())
queueB.put(out)
return
我 运行 一个 range(10)
测试就像你原来的例子,得到:
{'18428': 5,
'18429': 5}