在多处理中使用共享列表的正确方法是什么

What is proper way to use shared list in multiprocessing

我在 Manager, Lock 多处理的帮助下在 Python(版本 3.7)中实现了 SharedList。我将它用作使用 multiprocessing Process 函数调用创建的进程中的共享对象。 Shared List 用来存放values/objects各个进程Sharing它产生的

SharedList 的实施 ManagerLock of multiprocessing of Python

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.results = self.manager.list([])
        self.lock = Lock()
        self.limit = limit

    def append(self, new_value):
        with self.lock:
            if len(self.results) == self.limit:
                return False
            self.results.append(new_value)
            return True

    def list(self):
        with self.lock:
            return list(self.results).copy()

使用创建的 SharedList 来存储使用 multiprocessing

创建的多个进程的值
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

实施child_function

while True:
  result = func()
  if not (results.append(result)):
     break

一些场景的实现有效,但是挂断,当我增加了限制。 我用的处理器数量少于CPU做了同样的实验还是在同样的位置挂机

是否有更好的方法来解决上述问题,我研究了不同的方法,例如使用 Queue,但没有按预期工作,请挂断电话?

添加了之前使用队列的实现

使用队列实现

results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
    new_process = multiprocessing.Process(target=child_function,
                                            args=(tasks, results)
    processes.append(new_process)
    new_process.start()

sleep(5)
for i in range(limit):
    tasks.put(0)
sleep(1)

for i in range(num_processes):
    tasks.put(-1)

num_finished_processes = 0
while True:
    new_result = results.get()
    if new_result == -1:
        num_finished_processes += 1
        if num_finished_processes == num_processes:
            break
    else:
        results_out.append(new_result)

for process in processes:
    process.join()

for process in processes:
    process.close()

child_function

while True:
    task_val = tasks.get()
    if task_val < 0:
        results.put(-1)
        break
    else:
        result = func()
        results.put(result)

已更新

在发布此问题之前,我已经阅读了以下参考资料,但无法获得所需的输出。我同意,这段代码导致了死锁状态,但我无法在 python

中使用多处理找到没有死锁的实现

参考资料

  1. Multiprocessing of shared list

  2. https://pymotw.com/2/multiprocessing/basics.html

  3. Shared variable in python's multiprocessing

  4. https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

  5. https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba

  6. http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/

根据建议,我能够使用 Queue

修改 SharedList
class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.tasks = self.manager.Queue()
        self.results = self.manager.Queue()
        self.limit = limit
        self.no_of_process = min(process_count, limit)

    def setup(self):
        sleep(1)
        for i in range(self.limit):
            self.tasks.put(0)
        sleep(1)
        for i in range(self.no_of_process):
            self.tasks.put(-1)

    def append(self, new_value):
        task_val = self.tasks.get()
        if task_val < 0:
            self.results.put(-1)
            return False
        else:
            self.results.put(new_value)
            return True

    def list(self):
        results_out = []
        num_finished_processes = 0
        while True:
            new_result = self.results.get()
            if new_result == -1:
                num_finished_processes += 1
                if num_finished_processes == self.no_of_process:
                    break
            else:
                results_out.append(new_result)
        return results_out

这个实现工作正常,有以下实现更改

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

results.setup()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

实施child_function

while True:
  result = func()
  if not (results.append(result)):
     break

但是,在一些迭代之后,这又一次陷入了僵局,挂断了

根据建议,我能够使用 Queue

修改 SharedList
class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.tasks = self.manager.Queue()
        self.results = self.manager.Queue()
        self.limit = limit
        self.no_of_process = min(process_count, limit)

    def setup(self):
        sleep(1)
        for i in range(self.limit):
            self.tasks.put(0)
        sleep(1)
        for i in range(self.no_of_process):
            self.tasks.put(-1)

    def append(self, new_value):
        task_val = self.tasks.get()
        if task_val < 0:
            self.results.put(-1)
            return False
        else:
            self.results.put(new_value)
            return True

    def list(self):
        results_out = []
        num_finished_processes = 0
        while True:
            new_result = self.results.get()
            if new_result == -1:
                num_finished_processes += 1
                if num_finished_processes == self.no_of_process:
                    break
            else:
                results_out.append(new_result)
        return results_out

此实施工作正常,实施更改如下

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

results.setup()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

实施child_function

while True:
  result = func()
  if not (results.append(result)):
     break

我找到了下面这篇基于Ray的文章,听起来很有趣,也很容易实现并行计算,既有效又省时

https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8