Python 当数据集太大时,多处理示例永远不会终止
Python multiprocessing example never terminates when the dataset is too large
在下面的示例问题中,主程序创建了一个长度为 data_size
的随机字符串列表。如果没有 multi-processing,数据将直接发送到 Test.iterate()
,其中 class 只是将字符串 Test-
添加到每个随机字符串的开头。当 运行 没有多处理时,代码在 data_size
的小值和 data_size
.
的大值下工作得很好
我决定为这个测试问题添加多处理能力,并将多处理的核心组件分解为class标题MultiProc
。成员函数 Multiproc.run_processes()
管理 class 中的所有函数。该函数假定输入列表将根据用户希望使用的进程数分成 x 个较小的列表。因此,该函数首先确定每个 sub-list 相对于初始列表的上限和下限索引,以便代码知道为每个线程迭代哪些部分。该函数然后启动进程,启动进程,加入进程,从 Queue
中提取数据,然后它 re-orders 基于传递给主函数的计数器返回的数据。 MultiProc class 在 data_size
的小值下工作得很好,但在 ~500 以上的值上,代码永远不会终止,尽管我怀疑该值会因计算机而异,具体取决于内存。但是,在某些时候,多进程函数停止工作,我怀疑它与从多进程返回数据的方式有关。有谁知道可能导致此问题的原因以及如何解决它?
from multiprocessing import Process, Queue
from itertools import chain
import string
import random
class Test:
def __init__(self, array_list):
self.array_list = array_list
def func(self, names):
return 'Test-' + names
def iterate(self, upper, lower, counter):
output = [self.func(self.array_list[i]) for i in range(lower, upper)]
return output, counter
class MultiProc:
def __init__(self, num_procs, data_array, func):
self.num_procs = num_procs
self.data_array = data_array
self.func = func
if self.num_procs > len(self.data_array):
self.num_procs = len(self.data_array)
self.length = int((len(self.data_array) / self.num_procs) // 1)
def run_processes(self):
upper = self.__determine_upper_indices()
lower = self.__determine_lower_indices(upper)
p, q = self.__initiate_proc(self.func, upper, lower)
self.__start_thread(p)
self.__join_threads(p)
results = self.__extract_data(q)
new = self.__reorder_data(results)
return new
def __determine_upper_indices(self):
upper = [i * self.length for i in range(1, self.num_procs)]
upper.append(len(self.data_array))
return upper
def __determine_lower_indices(self, upper):
lower = [upper[i] for i in range(len(upper) - 1)]
lower = [0] + lower
return lower
def __initiate_proc(self, func, upper, lower):
q = Queue()
p = [Process(target=self.run_and_send_back_output,
args=(q, func, upper[i], lower[i], i))
for i in range(self.num_procs)]
return p, q
def __start_thread(self, p):
[p[i].start() for i in range(self.num_procs)]
def __join_threads(self, p):
[p[i].join() for i in range(self.num_procs)]
def __extract_data(self, q):
results = []
while not q.empty():
results.extend(q.get())
return results
def __reorder_data(self, results):
new = [results[i - 1] for j in range(self.num_procs)
for i in range(len(results)) if results[i] == j]
new = list(chain.from_iterable(new))
return new
def run_and_send_back_output(self, queue, func, *args):
result = func(*args) # run the func
queue.put(result) # send the result back
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
if __name__ == "__main__":
random.seed(1234)
data_size = 9
num_proc = 2
test_list = [id_generator() for i in range(data_size)]
obj1 = Test(test_list)
result1 = obj1.iterate(data_size, 0, 1)
print(result1)
multi = MultiProc(num_proc, test_list, obj1.iterate)
result2 = multi.run_processes()
print(result2)
# >> ['Test-2HAFCF', 'Test-GWPBBB', 'Test-W43JFL', 'Test-HA65PE',
# 'Test-83EF6C', 'Test-R9ET4W', 'Test-RPM37B', 'Test-6EAVJ4',
# 'Test-YKDE5K']
你的主要问题是:
self.__start_thread(p)
self.__join_threads(p)
results = self.__extract_data(q)
你启动你的工作人员试图将一些东西放入队列,然后加入工作人员,然后才开始从队列中检索数据。然而,工作人员只能在所有数据都已刷新到底层管道后才能退出,否则将在退出时阻塞。在开始从管道检索元素之前加入像这样阻塞的进程可能会导致死锁。
也许您应该研究一下 multiprocessing.Pool
, as what you're trying to implement is some kind of a map()
操作。您的示例可以像这样更优雅地重写:
from multiprocessing import Pool
import string
import random
def func(name):
return 'Test-' + name
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
if __name__ == "__main__":
random.seed(1234)
data_size = 5000
num_proc = 2
test_list = [id_generator() for i in range(data_size)]
with Pool(num_proc) as pool:
result = pool.map(func, test_list)
print(result)
在下面的示例问题中,主程序创建了一个长度为 data_size
的随机字符串列表。如果没有 multi-processing,数据将直接发送到 Test.iterate()
,其中 class 只是将字符串 Test-
添加到每个随机字符串的开头。当 运行 没有多处理时,代码在 data_size
的小值和 data_size
.
我决定为这个测试问题添加多处理能力,并将多处理的核心组件分解为class标题MultiProc
。成员函数 Multiproc.run_processes()
管理 class 中的所有函数。该函数假定输入列表将根据用户希望使用的进程数分成 x 个较小的列表。因此,该函数首先确定每个 sub-list 相对于初始列表的上限和下限索引,以便代码知道为每个线程迭代哪些部分。该函数然后启动进程,启动进程,加入进程,从 Queue
中提取数据,然后它 re-orders 基于传递给主函数的计数器返回的数据。 MultiProc class 在 data_size
的小值下工作得很好,但在 ~500 以上的值上,代码永远不会终止,尽管我怀疑该值会因计算机而异,具体取决于内存。但是,在某些时候,多进程函数停止工作,我怀疑它与从多进程返回数据的方式有关。有谁知道可能导致此问题的原因以及如何解决它?
from multiprocessing import Process, Queue
from itertools import chain
import string
import random
class Test:
def __init__(self, array_list):
self.array_list = array_list
def func(self, names):
return 'Test-' + names
def iterate(self, upper, lower, counter):
output = [self.func(self.array_list[i]) for i in range(lower, upper)]
return output, counter
class MultiProc:
def __init__(self, num_procs, data_array, func):
self.num_procs = num_procs
self.data_array = data_array
self.func = func
if self.num_procs > len(self.data_array):
self.num_procs = len(self.data_array)
self.length = int((len(self.data_array) / self.num_procs) // 1)
def run_processes(self):
upper = self.__determine_upper_indices()
lower = self.__determine_lower_indices(upper)
p, q = self.__initiate_proc(self.func, upper, lower)
self.__start_thread(p)
self.__join_threads(p)
results = self.__extract_data(q)
new = self.__reorder_data(results)
return new
def __determine_upper_indices(self):
upper = [i * self.length for i in range(1, self.num_procs)]
upper.append(len(self.data_array))
return upper
def __determine_lower_indices(self, upper):
lower = [upper[i] for i in range(len(upper) - 1)]
lower = [0] + lower
return lower
def __initiate_proc(self, func, upper, lower):
q = Queue()
p = [Process(target=self.run_and_send_back_output,
args=(q, func, upper[i], lower[i], i))
for i in range(self.num_procs)]
return p, q
def __start_thread(self, p):
[p[i].start() for i in range(self.num_procs)]
def __join_threads(self, p):
[p[i].join() for i in range(self.num_procs)]
def __extract_data(self, q):
results = []
while not q.empty():
results.extend(q.get())
return results
def __reorder_data(self, results):
new = [results[i - 1] for j in range(self.num_procs)
for i in range(len(results)) if results[i] == j]
new = list(chain.from_iterable(new))
return new
def run_and_send_back_output(self, queue, func, *args):
result = func(*args) # run the func
queue.put(result) # send the result back
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
if __name__ == "__main__":
random.seed(1234)
data_size = 9
num_proc = 2
test_list = [id_generator() for i in range(data_size)]
obj1 = Test(test_list)
result1 = obj1.iterate(data_size, 0, 1)
print(result1)
multi = MultiProc(num_proc, test_list, obj1.iterate)
result2 = multi.run_processes()
print(result2)
# >> ['Test-2HAFCF', 'Test-GWPBBB', 'Test-W43JFL', 'Test-HA65PE',
# 'Test-83EF6C', 'Test-R9ET4W', 'Test-RPM37B', 'Test-6EAVJ4',
# 'Test-YKDE5K']
你的主要问题是:
self.__start_thread(p)
self.__join_threads(p)
results = self.__extract_data(q)
你启动你的工作人员试图将一些东西放入队列,然后加入工作人员,然后才开始从队列中检索数据。然而,工作人员只能在所有数据都已刷新到底层管道后才能退出,否则将在退出时阻塞。在开始从管道检索元素之前加入像这样阻塞的进程可能会导致死锁。
也许您应该研究一下 multiprocessing.Pool
, as what you're trying to implement is some kind of a map()
操作。您的示例可以像这样更优雅地重写:
from multiprocessing import Pool
import string
import random
def func(name):
return 'Test-' + name
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
if __name__ == "__main__":
random.seed(1234)
data_size = 5000
num_proc = 2
test_list = [id_generator() for i in range(data_size)]
with Pool(num_proc) as pool:
result = pool.map(func, test_list)
print(result)