在多处理中使用共享列表的正确方法是什么
What is proper way to use shared list in multiprocessing
我在 Manager, Lock
多处理的帮助下在 Python(版本 3.7)中实现了 SharedList。我将它用作使用 multiprocessing Process
函数调用创建的进程中的共享对象。 Shared List 用来存放values/objects各个进程Sharing它产生的
SharedList 的实施 Manager
和 Lock
of multiprocessing
of Python
class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.results = self.manager.list([])
self.lock = Lock()
self.limit = limit
def append(self, new_value):
with self.lock:
if len(self.results) == self.limit:
return False
self.results.append(new_value)
return True
def list(self):
with self.lock:
return list(self.results).copy()
使用创建的 SharedList 来存储使用 multiprocessing
创建的多个进程的值
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = Process(target=child_function, args=(results))
processes.append(new_process)
new_process.start()
for _process in processes:
_process.join()
for _process in processes:
_process.close()
实施child_function
while True:
result = func()
if not (results.append(result)):
break
一些场景的实现有效,但是挂断,当我增加了限制。
我用的处理器数量少于CPU做了同样的实验还是在同样的位置挂机
是否有更好的方法来解决上述问题,我研究了不同的方法,例如使用 Queue,但没有按预期工作,请挂断电话?
添加了之前使用队列的实现
使用队列实现
results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = multiprocessing.Process(target=child_function,
args=(tasks, results)
processes.append(new_process)
new_process.start()
sleep(5)
for i in range(limit):
tasks.put(0)
sleep(1)
for i in range(num_processes):
tasks.put(-1)
num_finished_processes = 0
while True:
new_result = results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == num_processes:
break
else:
results_out.append(new_result)
for process in processes:
process.join()
for process in processes:
process.close()
在child_function
while True:
task_val = tasks.get()
if task_val < 0:
results.put(-1)
break
else:
result = func()
results.put(result)
已更新
在发布此问题之前,我已经阅读了以下参考资料,但无法获得所需的输出。我同意,这段代码导致了死锁状态,但我无法在 python
中使用多处理找到没有死锁的实现
参考资料
Multiprocessing of shared list
Shared variable in python's multiprocessing
https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing
https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba
http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/
根据建议,我能够使用 Queue
修改 SharedList
class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.tasks = self.manager.Queue()
self.results = self.manager.Queue()
self.limit = limit
self.no_of_process = min(process_count, limit)
def setup(self):
sleep(1)
for i in range(self.limit):
self.tasks.put(0)
sleep(1)
for i in range(self.no_of_process):
self.tasks.put(-1)
def append(self, new_value):
task_val = self.tasks.get()
if task_val < 0:
self.results.put(-1)
return False
else:
self.results.put(new_value)
return True
def list(self):
results_out = []
num_finished_processes = 0
while True:
new_result = self.results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == self.no_of_process:
break
else:
results_out.append(new_result)
return results_out
这个实现工作正常,有以下实现更改
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = Process(target=child_function, args=(results))
processes.append(new_process)
new_process.start()
results.setup()
for _process in processes:
_process.join()
for _process in processes:
_process.close()
实施child_function
while True:
result = func()
if not (results.append(result)):
break
但是,在一些迭代之后,这又一次陷入了僵局,挂断了
根据建议,我能够使用 Queue
修改 SharedList
class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.tasks = self.manager.Queue()
self.results = self.manager.Queue()
self.limit = limit
self.no_of_process = min(process_count, limit)
def setup(self):
sleep(1)
for i in range(self.limit):
self.tasks.put(0)
sleep(1)
for i in range(self.no_of_process):
self.tasks.put(-1)
def append(self, new_value):
task_val = self.tasks.get()
if task_val < 0:
self.results.put(-1)
return False
else:
self.results.put(new_value)
return True
def list(self):
results_out = []
num_finished_processes = 0
while True:
new_result = self.results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == self.no_of_process:
break
else:
results_out.append(new_result)
return results_out
此实施工作正常,实施更改如下
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = Process(target=child_function, args=(results))
processes.append(new_process)
new_process.start()
results.setup()
for _process in processes:
_process.join()
for _process in processes:
_process.close()
实施child_function
while True:
result = func()
if not (results.append(result)):
break
我找到了下面这篇基于Ray的文章,听起来很有趣,也很容易实现并行计算,既有效又省时
我在 Manager, Lock
多处理的帮助下在 Python(版本 3.7)中实现了 SharedList。我将它用作使用 multiprocessing Process
函数调用创建的进程中的共享对象。 Shared List 用来存放values/objects各个进程Sharing它产生的
SharedList 的实施 Manager
和 Lock
of multiprocessing
of Python
class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.results = self.manager.list([])
self.lock = Lock()
self.limit = limit
def append(self, new_value):
with self.lock:
if len(self.results) == self.limit:
return False
self.results.append(new_value)
return True
def list(self):
with self.lock:
return list(self.results).copy()
使用创建的 SharedList 来存储使用 multiprocessing
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = Process(target=child_function, args=(results))
processes.append(new_process)
new_process.start()
for _process in processes:
_process.join()
for _process in processes:
_process.close()
实施child_function
while True:
result = func()
if not (results.append(result)):
break
一些场景的实现有效,但是挂断,当我增加了限制。 我用的处理器数量少于CPU做了同样的实验还是在同样的位置挂机
是否有更好的方法来解决上述问题,我研究了不同的方法,例如使用 Queue,但没有按预期工作,请挂断电话?
添加了之前使用队列的实现
使用队列实现
results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = multiprocessing.Process(target=child_function,
args=(tasks, results)
processes.append(new_process)
new_process.start()
sleep(5)
for i in range(limit):
tasks.put(0)
sleep(1)
for i in range(num_processes):
tasks.put(-1)
num_finished_processes = 0
while True:
new_result = results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == num_processes:
break
else:
results_out.append(new_result)
for process in processes:
process.join()
for process in processes:
process.close()
在child_function
while True:
task_val = tasks.get()
if task_val < 0:
results.put(-1)
break
else:
result = func()
results.put(result)
已更新
在发布此问题之前,我已经阅读了以下参考资料,但无法获得所需的输出。我同意,这段代码导致了死锁状态,但我无法在 python
中使用多处理找到没有死锁的实现参考资料
Multiprocessing of shared list
Shared variable in python's multiprocessing
https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing
https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba
http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/
根据建议,我能够使用 Queue
class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.tasks = self.manager.Queue()
self.results = self.manager.Queue()
self.limit = limit
self.no_of_process = min(process_count, limit)
def setup(self):
sleep(1)
for i in range(self.limit):
self.tasks.put(0)
sleep(1)
for i in range(self.no_of_process):
self.tasks.put(-1)
def append(self, new_value):
task_val = self.tasks.get()
if task_val < 0:
self.results.put(-1)
return False
else:
self.results.put(new_value)
return True
def list(self):
results_out = []
num_finished_processes = 0
while True:
new_result = self.results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == self.no_of_process:
break
else:
results_out.append(new_result)
return results_out
这个实现工作正常,有以下实现更改
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = Process(target=child_function, args=(results))
processes.append(new_process)
new_process.start()
results.setup()
for _process in processes:
_process.join()
for _process in processes:
_process.close()
实施child_function
while True:
result = func()
if not (results.append(result)):
break
但是,在一些迭代之后,这又一次陷入了僵局,挂断了
根据建议,我能够使用 Queue
class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.tasks = self.manager.Queue()
self.results = self.manager.Queue()
self.limit = limit
self.no_of_process = min(process_count, limit)
def setup(self):
sleep(1)
for i in range(self.limit):
self.tasks.put(0)
sleep(1)
for i in range(self.no_of_process):
self.tasks.put(-1)
def append(self, new_value):
task_val = self.tasks.get()
if task_val < 0:
self.results.put(-1)
return False
else:
self.results.put(new_value)
return True
def list(self):
results_out = []
num_finished_processes = 0
while True:
new_result = self.results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == self.no_of_process:
break
else:
results_out.append(new_result)
return results_out
此实施工作正常,实施更改如下
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = Process(target=child_function, args=(results))
processes.append(new_process)
new_process.start()
results.setup()
for _process in processes:
_process.join()
for _process in processes:
_process.close()
实施child_function
while True:
result = func()
if not (results.append(result)):
break
我找到了下面这篇基于Ray的文章,听起来很有趣,也很容易实现并行计算,既有效又省时