Python 当数据集太大时,多处理示例永远不会终止

Python multiprocessing example never terminates when the dataset is too large

在下面的示例问题中,主程序创建了一个长度为 data_size 的随机字符串列表。如果没有 multi-processing,数据将直接发送到 Test.iterate(),其中 class 只是将字符串 Test- 添加到每个随机字符串的开头。当 运行 没有多处理时,代码在 data_size 的小值和 data_size.

的大值下工作得很好

我决定为这个测试问题添加多处理能力,并将多处理的核心组件分解为class标题MultiProc。成员函数 Multiproc.run_processes() 管理 class 中的所有函数。该函数假定输入列表将根据用户希望使用的进程数分成 x 个较小的列表。因此,该函数首先确定每个 sub-list 相对于初始列表的上限和下限索引,以便代码知道为每个线程迭代哪些部分。该函数然后启动进程,启动进程,加入进程,从 Queue 中提取数据,然后它 re-orders 基于传递给主函数的计数器返回的数据。 MultiProc class 在 data_size 的小值下工作得很好,但在 ~500 以上的值上,代码永远不会终止,尽管我怀疑该值会因计算机而异,具体取决于内存。但是,在某些时候,多进程函数停止工作,我怀疑它与从多进程返回数据的方式有关。有谁知道可能导致此问题的原因以及如何解决它?

from multiprocessing import Process, Queue
from itertools import chain
import string
import random


class Test:
    def __init__(self, array_list):
        self.array_list = array_list

    def func(self, names):
        return 'Test-' + names

    def iterate(self, upper, lower, counter):
        output = [self.func(self.array_list[i]) for i in range(lower, upper)]
        return output, counter


class MultiProc:
    def __init__(self, num_procs, data_array, func):
        self.num_procs = num_procs
        self.data_array = data_array
        self.func = func
        if self.num_procs > len(self.data_array):
            self.num_procs = len(self.data_array)
        self.length = int((len(self.data_array) / self.num_procs) // 1)

    def run_processes(self):
        upper = self.__determine_upper_indices()
        lower = self.__determine_lower_indices(upper)
        p, q = self.__initiate_proc(self.func, upper, lower)
        self.__start_thread(p)
        self.__join_threads(p)
        results = self.__extract_data(q)
        new = self.__reorder_data(results)
        return new

    def __determine_upper_indices(self):
        upper = [i * self.length for i in range(1, self.num_procs)]
        upper.append(len(self.data_array))
        return upper

    def __determine_lower_indices(self, upper):
        lower = [upper[i] for i in range(len(upper) - 1)]
        lower = [0] + lower
        return lower

    def __initiate_proc(self, func, upper, lower):
        q = Queue()
        p = [Process(target=self.run_and_send_back_output,
                     args=(q, func, upper[i], lower[i], i))
                     for i in range(self.num_procs)]
        return p, q

    def __start_thread(self, p):
        [p[i].start() for i in range(self.num_procs)]

    def __join_threads(self, p):
        [p[i].join() for i in range(self.num_procs)]

    def __extract_data(self, q):
        results = []
        while not q.empty():
            results.extend(q.get())
        return results

    def __reorder_data(self, results):
        new = [results[i - 1] for j in range(self.num_procs)
               for i in range(len(results)) if results[i] == j]
        new = list(chain.from_iterable(new))
        return new

    def run_and_send_back_output(self, queue, func, *args):
        result = func(*args)  # run the func
        queue.put(result)    # send the result back

def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

if __name__ == "__main__":
    random.seed(1234)
    data_size = 9
    num_proc = 2
    test_list = [id_generator() for i in range(data_size)]
    obj1 = Test(test_list)
    result1 = obj1.iterate(data_size, 0, 1)
    print(result1)
    multi = MultiProc(num_proc, test_list, obj1.iterate)
    result2 = multi.run_processes()
    print(result2)
    # >> ['Test-2HAFCF', 'Test-GWPBBB', 'Test-W43JFL', 'Test-HA65PE',
    #     'Test-83EF6C', 'Test-R9ET4W', 'Test-RPM37B', 'Test-6EAVJ4',
    #     'Test-YKDE5K']

你的主要问题是:

    self.__start_thread(p)
    self.__join_threads(p)
    results = self.__extract_data(q)

你启动你的工作人员试图将一些东西放入队列,然后加入工作人员,然后才开始从队列中检索数据。然而,工作人员只能在所有数据都已刷新到底层管道后才能退出,否则将在退出时阻塞。在开始从管道检索元素之前加入像这样阻塞的进程可能会导致死锁。

也许您应该研究一下 multiprocessing.Pool, as what you're trying to implement is some kind of a map() 操作。您的示例可以像这样更优雅地重写:

from multiprocessing import Pool
import string
import random


def func(name):
    return 'Test-' + name

def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

if __name__ == "__main__":
    random.seed(1234)
    data_size = 5000
    num_proc = 2
    test_list = [id_generator() for i in range(data_size)]
    with Pool(num_proc) as pool:
        result = pool.map(func, test_list)
    print(result)