在 Python 中的多个进程之间共享一个存储对象的字典
Share a dictionary storing objects between several processes in Python
我正在编写一个大型脚本,其主要目的是读取许多文件的内容并将每个元素的编号存储在字典中。如果字典中不存在该元素,那么我们将创建某个对象的新实例,然后递增,否则只会递增。由于要处理的每个文件本身都很大,有时我需要处理 100 多个文件,所以我想加快速度并利用 Python 的多处理模块。这是脚本的简化版本(我用...隐藏了路径,它不是真实的):
import multiprocessing as mp
from os import listdir
from os.path import join
manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()
class TestClass:
def __init__(self):
self._number = 0
def increment(self):
self._number += 1
def worker(file):
f = open(file, 'r')
for line in f.readlines():
if line not in dictionary:
dictionary[line] = TestClass()
dictionary[line].increment()
def _list_files():
for f in listdir("..."):
queue.put(join("...", f))
def pool():
_list_files()
_pool = mp.Pool(mp.cpu_count())
for i in range(len(queue)):
_pool.apply(worker, args=(queue.get()))
_pool.close()
_pool.join()
pool()
print(dictionary)
问题是脚本崩溃并显示消息:
AttributeError: Can't get attribute 'TestClass' on <module '__main__' from '.../multiprocessing_test.py'>
有什么方法可以让它工作吗?
我不是脚本初始版本的创建者,我只是为其添加了一些功能。鉴于此,脚本的结构必须保持不变,因为重写它会花费太多时间,即 TestClass
、worker
和 list_files
不能改变它们的结构(除了所有东西与多处理连接)
(看来您之前post回答过这个问题。)
您的示例代码由于多种原因无法运行,其中最重要的是 ...
没有做任何有用的事情:
$ python tst.py
Traceback (most recent call last):
File "tst.py", line 38, in <module>
pool()
File "tst.py", line 29, in pool
_list_files()
File "tst.py", line 25, in _list_files
for f in listdir("..."):
OSError: [Errno 2] No such file or directory: '...'
(对于不会 运行 的 post 代码来说,这不是一个好的形式,但是 是 提供一个 MCVE.) 所以我修正了:
index 39014ff..1ac9f4a 100644
--- a/tst.py
+++ b/tst.py
@@ -2,6 +2,8 @@ import multiprocessing as mp
from os import listdir
from os.path import join
+DIRPATH = 'inputs'
+
manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()
@@ -22,8 +24,8 @@ def worker(file):
dictionary[line].increment()
def _list_files():
- for f in listdir("..."):
- queue.put(join("...", f))
+ for f in listdir(DIRPATH):
+ queue.put(join(DIRPATH, f))
def pool():
_list_files()
并用一个示例输入文件创建了一个 inputs/
目录:
$ ls inputs
one
$ cat inputs/one
1
one
unum
现在这个例子产生:
$ python tst.py
Traceback (most recent call last):
File "tst.py", line 40, in <module>
pool()
File "tst.py", line 34, in pool
for i in range(len(queue)):
TypeError: object of type 'AutoProxy[Queue]' has no len()
现在,我不会说这个重写 很好 ,但我继续将它重写成 可以 工作的东西:
import multiprocessing as mp
from os import listdir
from os.path import join
DIRPATH = 'inputs'
class TestClass:
def __repr__(self):
return str(self._number)
def __init__(self):
self._number = 0
def increment(self):
self._number += 1
def worker(dictionary, queue):
while True:
path = queue.get()
if path is None:
return
f = open(path, 'r')
for line in f.readlines():
if line not in dictionary:
dictionary[line] = TestClass()
dictionary[line].increment()
def run_pool():
manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()
nworkers = mp.cpu_count()
pool = mp.Pool(nworkers)
for i in range(nworkers):
pool.apply_async(worker, args=(dictionary, queue))
for f in listdir(DIRPATH):
queue.put(join(DIRPATH, f))
for i in range(nworkers):
queue.put(None)
pool.close()
pool.join()
return dictionary
def main():
dictionary = run_pool()
print(dictionary)
if __name__ == '__main__':
main()
主要区别是:
我删除了所有的全局变量。管理器实例、托管队列和托管字典都是 run_pool
.
的本地
我在创建 nworker
工作人员后 将文件名称放入队列 。每个 worker 运行 一个循环,读取文件名,直到它读取一个 None
名称,然后 returns 它的 (None) 结果。
主循环将文件名放入队列中,以便工作人员在完成每个前一个文件时可以将文件名从队列中拉出。为了向所有 nworkers
工作人员发出退出信号,主循环将那么多 None
条目添加到队列中。
run_pool
returns 最终(仍然管理)字典。
当然,我向您的 TestClass
对象添加了一个 __repr__
,以便我们可以看到计数。我还通过将 main
驱动程序移动到一个仅在 __name__ == '__main__'
.
时调用的函数来确保代码应该在 Windows 上工作
我正在编写一个大型脚本,其主要目的是读取许多文件的内容并将每个元素的编号存储在字典中。如果字典中不存在该元素,那么我们将创建某个对象的新实例,然后递增,否则只会递增。由于要处理的每个文件本身都很大,有时我需要处理 100 多个文件,所以我想加快速度并利用 Python 的多处理模块。这是脚本的简化版本(我用...隐藏了路径,它不是真实的):
import multiprocessing as mp
from os import listdir
from os.path import join
manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()
class TestClass:
def __init__(self):
self._number = 0
def increment(self):
self._number += 1
def worker(file):
f = open(file, 'r')
for line in f.readlines():
if line not in dictionary:
dictionary[line] = TestClass()
dictionary[line].increment()
def _list_files():
for f in listdir("..."):
queue.put(join("...", f))
def pool():
_list_files()
_pool = mp.Pool(mp.cpu_count())
for i in range(len(queue)):
_pool.apply(worker, args=(queue.get()))
_pool.close()
_pool.join()
pool()
print(dictionary)
问题是脚本崩溃并显示消息:
AttributeError: Can't get attribute 'TestClass' on <module '__main__' from '.../multiprocessing_test.py'>
有什么方法可以让它工作吗?
我不是脚本初始版本的创建者,我只是为其添加了一些功能。鉴于此,脚本的结构必须保持不变,因为重写它会花费太多时间,即 TestClass
、worker
和 list_files
不能改变它们的结构(除了所有东西与多处理连接)
(看来您之前post回答过这个问题。)
您的示例代码由于多种原因无法运行,其中最重要的是 ...
没有做任何有用的事情:
$ python tst.py
Traceback (most recent call last):
File "tst.py", line 38, in <module>
pool()
File "tst.py", line 29, in pool
_list_files()
File "tst.py", line 25, in _list_files
for f in listdir("..."):
OSError: [Errno 2] No such file or directory: '...'
(对于不会 运行 的 post 代码来说,这不是一个好的形式,但是 是 提供一个 MCVE.) 所以我修正了:
index 39014ff..1ac9f4a 100644
--- a/tst.py
+++ b/tst.py
@@ -2,6 +2,8 @@ import multiprocessing as mp
from os import listdir
from os.path import join
+DIRPATH = 'inputs'
+
manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()
@@ -22,8 +24,8 @@ def worker(file):
dictionary[line].increment()
def _list_files():
- for f in listdir("..."):
- queue.put(join("...", f))
+ for f in listdir(DIRPATH):
+ queue.put(join(DIRPATH, f))
def pool():
_list_files()
并用一个示例输入文件创建了一个 inputs/
目录:
$ ls inputs
one
$ cat inputs/one
1
one
unum
现在这个例子产生:
$ python tst.py
Traceback (most recent call last):
File "tst.py", line 40, in <module>
pool()
File "tst.py", line 34, in pool
for i in range(len(queue)):
TypeError: object of type 'AutoProxy[Queue]' has no len()
现在,我不会说这个重写 很好 ,但我继续将它重写成 可以 工作的东西:
import multiprocessing as mp
from os import listdir
from os.path import join
DIRPATH = 'inputs'
class TestClass:
def __repr__(self):
return str(self._number)
def __init__(self):
self._number = 0
def increment(self):
self._number += 1
def worker(dictionary, queue):
while True:
path = queue.get()
if path is None:
return
f = open(path, 'r')
for line in f.readlines():
if line not in dictionary:
dictionary[line] = TestClass()
dictionary[line].increment()
def run_pool():
manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()
nworkers = mp.cpu_count()
pool = mp.Pool(nworkers)
for i in range(nworkers):
pool.apply_async(worker, args=(dictionary, queue))
for f in listdir(DIRPATH):
queue.put(join(DIRPATH, f))
for i in range(nworkers):
queue.put(None)
pool.close()
pool.join()
return dictionary
def main():
dictionary = run_pool()
print(dictionary)
if __name__ == '__main__':
main()
主要区别是:
我删除了所有的全局变量。管理器实例、托管队列和托管字典都是
run_pool
. 的本地
我在创建
nworker
工作人员后 将文件名称放入队列 。每个 worker 运行 一个循环,读取文件名,直到它读取一个None
名称,然后 returns 它的 (None) 结果。主循环将文件名放入队列中,以便工作人员在完成每个前一个文件时可以将文件名从队列中拉出。为了向所有
nworkers
工作人员发出退出信号,主循环将那么多None
条目添加到队列中。run_pool
returns 最终(仍然管理)字典。
当然,我向您的 TestClass
对象添加了一个 __repr__
,以便我们可以看到计数。我还通过将 main
驱动程序移动到一个仅在 __name__ == '__main__'
.