如何 return 从函数传递给多处理的计数器字典?
How to return a counter dictionary from a function passed to multiprocessing?
我有一份 CSV 文件列表。我想对它们中的每一个进行一组操作,然后生成一个计数器字典,我想从所有 CSV 文件中创建一个包含单独计数器字典的主列表。我想并行处理每个 csv 文件,然后 return 来自每个文件的计数器字典。我在这里找到了类似的解决方案:How can I recover the return value of a function passed to multiprocessing.Process?
我使用了 David Cullen 建议的解决方案。该解决方案非常适合字符串,但是当我尝试 return 反字典或普通字典时。所有 CSV 文件都被处理,直到 send_end.send(result) 并且它在执行时永远挂在那里,然后抛出内存错误。我是 运行 这在一个 Linux 服务器中,有足够的内存来创建计数器指令列表。
我使用了以下代码:
import multiprocessing
#get current working directory
cwd = os.getcwd()
#take a list of all files in cwd
files = os.listdir(cwd)
#defining the function that needs to be done on all csv files
def worker(f,send_end):
infile= open(f)
#read liens in csv file
lines = infile.readlines()
#split the lines by "," and store it in a list of lists
master_lst = [line.strip().split(“,”) for line in lines]
#extract the second field in each sublist
counter_lst = [ element[1] for element in master_lst]
print “Total elements in the list” + str(len(counter_lst))
#create a dictionary of count elements
a = Counter(counter_lst)
# return the counter dict
send_end.send(a)
def main():
jobs = []
pipe_list = []
for f in files:
if f.endswith('.csv'):
recv_end, send_end = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=worker, args=(f, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print len(result_list)
if __name__ == '__main__':
main()
我得到的错误如下:
Process Process-42:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
a = Counter(counter_lst)
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
MemoryError
Process Process-17:
Traceback (most recent call last):
Process Process-6:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
Process Process-8:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
self.run()
self.run()
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
self._target(*self._args, **self._kwargs)
self._target(*self._args, **self._kwargs)
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
a = Counter(counter_lst_lst)
a = Counter(counter_lst_lst)
a = Counter(counter_lst_lst)
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
MemoryError
self.update(iterable, **kwds)
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
self[elem] = self_get(elem, 0) + 1
MemoryError
MemoryError
Process Process-10:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
a = Counter(counter_lst)
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
MemoryError
^Z
[18]+ Stopped collapse_multiprocessing_return.py
现在 send_end.send(a) 中的 "a" 如果我替换文件名 f。它打印目录中 csv 文件的数量(这就是 len(result_list) 在这种情况下所做的)。但是当计数器字典 "a" 被 returned 时,它会永远卡住,抛出上述错误。
我想让代码在没有任何 error/problems 的情况下通过计数器命令接收端。有解决办法吗?有人可以提出一个可能的解决方案吗?
p.s:我是多处理模块的新手,如果这个问题听起来很幼稚,我很抱歉。另外,我尝试了multiprocessing.Manager(),但得到了类似的错误
您的回溯提到 Process Process-42:
,因此至少创建了 42 个进程。您正在为每个 CSV 文件创建一个进程,这没有用,可能会导致内存错误。
使用 multiprocessing.Pool.map
可以更简单地解决您的问题。 worker
函数也可以大大缩短:
def worker(f):
with open(f) as infile:
return Counter(line.strip().split(",")[1]
for line in infile)
def main():
pool = multiprocessing.Pool()
result_list = pool.map(worker, [f for f in files if f.endswith('.csv')])
不向池传递任何参数意味着它将创建与您拥有 CPU 个核心一样多的进程。使用更多可能会也可能不会提高性能。
我有一份 CSV 文件列表。我想对它们中的每一个进行一组操作,然后生成一个计数器字典,我想从所有 CSV 文件中创建一个包含单独计数器字典的主列表。我想并行处理每个 csv 文件,然后 return 来自每个文件的计数器字典。我在这里找到了类似的解决方案:How can I recover the return value of a function passed to multiprocessing.Process?
我使用了 David Cullen 建议的解决方案。该解决方案非常适合字符串,但是当我尝试 return 反字典或普通字典时。所有 CSV 文件都被处理,直到 send_end.send(result) 并且它在执行时永远挂在那里,然后抛出内存错误。我是 运行 这在一个 Linux 服务器中,有足够的内存来创建计数器指令列表。
我使用了以下代码:
import multiprocessing
#get current working directory
cwd = os.getcwd()
#take a list of all files in cwd
files = os.listdir(cwd)
#defining the function that needs to be done on all csv files
def worker(f,send_end):
infile= open(f)
#read liens in csv file
lines = infile.readlines()
#split the lines by "," and store it in a list of lists
master_lst = [line.strip().split(“,”) for line in lines]
#extract the second field in each sublist
counter_lst = [ element[1] for element in master_lst]
print “Total elements in the list” + str(len(counter_lst))
#create a dictionary of count elements
a = Counter(counter_lst)
# return the counter dict
send_end.send(a)
def main():
jobs = []
pipe_list = []
for f in files:
if f.endswith('.csv'):
recv_end, send_end = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=worker, args=(f, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print len(result_list)
if __name__ == '__main__':
main()
我得到的错误如下:
Process Process-42:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
a = Counter(counter_lst)
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
MemoryError
Process Process-17:
Traceback (most recent call last):
Process Process-6:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
Process Process-8:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
self.run()
self.run()
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
self._target(*self._args, **self._kwargs)
self._target(*self._args, **self._kwargs)
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
a = Counter(counter_lst_lst)
a = Counter(counter_lst_lst)
a = Counter(counter_lst_lst)
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
MemoryError
self.update(iterable, **kwds)
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
self[elem] = self_get(elem, 0) + 1
MemoryError
MemoryError
Process Process-10:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in
_bootstrap
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/amm/python/collapse_multiprocessing_return.py", line 32, in
worker
a = Counter(counter_lst)
File "/usr/lib64/python2.7/collections.py", line 444, in __init__
self.update(iterable, **kwds)
File "/usr/lib64/python2.7/collections.py", line 526, in update
self[elem] = self_get(elem, 0) + 1
MemoryError
^Z
[18]+ Stopped collapse_multiprocessing_return.py
现在 send_end.send(a) 中的 "a" 如果我替换文件名 f。它打印目录中 csv 文件的数量(这就是 len(result_list) 在这种情况下所做的)。但是当计数器字典 "a" 被 returned 时,它会永远卡住,抛出上述错误。
我想让代码在没有任何 error/problems 的情况下通过计数器命令接收端。有解决办法吗?有人可以提出一个可能的解决方案吗?
p.s:我是多处理模块的新手,如果这个问题听起来很幼稚,我很抱歉。另外,我尝试了multiprocessing.Manager(),但得到了类似的错误
您的回溯提到 Process Process-42:
,因此至少创建了 42 个进程。您正在为每个 CSV 文件创建一个进程,这没有用,可能会导致内存错误。
使用 multiprocessing.Pool.map
可以更简单地解决您的问题。 worker
函数也可以大大缩短:
def worker(f):
with open(f) as infile:
return Counter(line.strip().split(",")[1]
for line in infile)
def main():
pool = multiprocessing.Pool()
result_list = pool.map(worker, [f for f in files if f.endswith('.csv')])
不向池传递任何参数意味着它将创建与您拥有 CPU 个核心一样多的进程。使用更多可能会也可能不会提高性能。