Python 多个进程 consuming/iterating 在单个生成器上(分而治之)
Python multiple processes consuming/iterating over single generator (divide and conquer)
我有一个 python 生成器,可以生成 returns 很多项目,例如:
import itertools
def generate_random_strings():
chars = "ABCDEFGH"
for item in itertools.product(chars, repeat=10):
yield "".join(item)
然后我对此进行迭代并执行各种任务,问题是我为此只使用了一个 thread/process:
my_strings = generate_random_strings()
for string in my_strings:
# do something with string...
print(string)
效果很好,我正在获取所有字符串,但速度很慢。我想利用 Python 多处理的力量来 "divide and conquer" 这个 for 循环。但是,当然,我希望每个字符串只处理一次。虽然我找到了很多关于多处理的文档,但我正在尝试用最少的代码找到最简单的解决方案。
我假设每个线程每次都应该获取一大块项目并在返回并获取另一大块等之前处理它们......
非常感谢,
代码最少的最简单的解决方案?多处理上下文管理器。
我假设您可以将 "do something with string" 放入一个名为 "do_something"
的函数中
from multiprocessing import Pool as ProcessPool
number_of_processes = 4
with ProcessPool(number_of_processes) as pool:
pool.map(do_something, my_strings)
如果您想再次获得 "do_something" 的结果,很简单!
with ProcessPool(number_of_processes) as pool:
results = pool.map(do_something, my_strings)
您会在列表中找到它们。
Multiprocessing.dummy 是进程池的语法包装器,可让您使用多处理语法。如果你想要线程而不是进程,只需这样做:
from multiprocessing.dummy import Pool as ThreadPool
您可以使用 multiprocessing
.
import multiprocessing
def string_fun(string):
# do something with string...
print(string)
my_strings = generate_random_strings()
num_of_threads = 7
pool = multiprocessing.Pool(num_of_threads)
pool.map(string_fun, my_strings)
假设您使用的是最新版本的 Python,您可能想阅读一些关于 asyncio 模块的内容。由于GIL锁,多线程不易实现:"In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once. This lock is necessary mainly because CPython's memory management is not thread-safe."
因此您可以交换多处理,或者如上文所述,查看 asycio 模块。
asyncio — 异步 I/O > https://docs.python.org/3/library/asyncio.html
我会尽快将此答案与一些代码整合。
希望对你有帮助,
和乐
正如@Hele 提到的,asyncio 是最好的,这里有一个例子
代码
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# python 3.7.2
from asyncio import ensure_future, gather, run
import random
alphabet = 'ABCDEFGH'
size = 1000
async def generate():
tasks = list()
result = None
for el in range(1, size):
task = ensure_future(generate_one())
tasks.append(task)
result = await gather(*tasks)
return list(set(result))
async def generate_one():
return ''.join(random.choice(alphabet) for i in range(8))
if __name__ == '__main__':
my_strings = run(generate())
print(my_strings)
输出
['CHABCGDD', 'ACBGAFEB', ...
当然,你需要改进generate_one,这个变种很慢。
您可以查看源代码here。
我有一个 python 生成器,可以生成 returns 很多项目,例如:
import itertools
def generate_random_strings():
chars = "ABCDEFGH"
for item in itertools.product(chars, repeat=10):
yield "".join(item)
然后我对此进行迭代并执行各种任务,问题是我为此只使用了一个 thread/process:
my_strings = generate_random_strings()
for string in my_strings:
# do something with string...
print(string)
效果很好,我正在获取所有字符串,但速度很慢。我想利用 Python 多处理的力量来 "divide and conquer" 这个 for 循环。但是,当然,我希望每个字符串只处理一次。虽然我找到了很多关于多处理的文档,但我正在尝试用最少的代码找到最简单的解决方案。 我假设每个线程每次都应该获取一大块项目并在返回并获取另一大块等之前处理它们......
非常感谢,
代码最少的最简单的解决方案?多处理上下文管理器。
我假设您可以将 "do something with string" 放入一个名为 "do_something"
的函数中from multiprocessing import Pool as ProcessPool
number_of_processes = 4
with ProcessPool(number_of_processes) as pool:
pool.map(do_something, my_strings)
如果您想再次获得 "do_something" 的结果,很简单!
with ProcessPool(number_of_processes) as pool:
results = pool.map(do_something, my_strings)
您会在列表中找到它们。
Multiprocessing.dummy 是进程池的语法包装器,可让您使用多处理语法。如果你想要线程而不是进程,只需这样做:
from multiprocessing.dummy import Pool as ThreadPool
您可以使用 multiprocessing
.
import multiprocessing
def string_fun(string):
# do something with string...
print(string)
my_strings = generate_random_strings()
num_of_threads = 7
pool = multiprocessing.Pool(num_of_threads)
pool.map(string_fun, my_strings)
假设您使用的是最新版本的 Python,您可能想阅读一些关于 asyncio 模块的内容。由于GIL锁,多线程不易实现:"In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once. This lock is necessary mainly because CPython's memory management is not thread-safe."
因此您可以交换多处理,或者如上文所述,查看 asycio 模块。
asyncio — 异步 I/O > https://docs.python.org/3/library/asyncio.html
我会尽快将此答案与一些代码整合。
希望对你有帮助,
和乐
正如@Hele 提到的,asyncio 是最好的,这里有一个例子
代码
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# python 3.7.2
from asyncio import ensure_future, gather, run
import random
alphabet = 'ABCDEFGH'
size = 1000
async def generate():
tasks = list()
result = None
for el in range(1, size):
task = ensure_future(generate_one())
tasks.append(task)
result = await gather(*tasks)
return list(set(result))
async def generate_one():
return ''.join(random.choice(alphabet) for i in range(8))
if __name__ == '__main__':
my_strings = run(generate())
print(my_strings)
输出
['CHABCGDD', 'ACBGAFEB', ...
当然,你需要改进generate_one,这个变种很慢。
您可以查看源代码here。