多处理追加到没有地图的池
Multiprocessing append to pool without map
我有一个函数可以将一堆字符串加密为 md5,在其中,我有一个我创建的池。
Main.py
config = ConfigParser()
config.read("config.ini")
possibleCharacters = "abcd"
def mapped_loop_digit(args):
loop_digit(*args, is_pool=True)
def loop_digit(current_str, place, strings, hashes, is_outer=False, is_pool=False):
if place == config.getint("string_creation", "length_for_new_process"):
current_strings = list()
for character in possibleCharacters:
current_str[place] = character
if is_outer and config.getboolean("development", "minor_logging"):
print("Outer character maker at", possibleCharacters.index(character) + 1, "in", len(possibleCharacters))
elif is_pool and config.getboolean("development", "pool_minor_logging"):
print("Outest in pool character maker for process", multiprocessing.current_process()._identity[0],
"at", possibleCharacters.index(character) + 1, "in", len(possibleCharacters), "with character as",
str(character) + ". Current string is", current_str)
if place == 0:
string = "".join(_character for _character in current_str)
hashes.append(hashlib.md5(string.encode()).hexdigest())
strings.append(string)
elif place == config.getint("string_creation", "length_for_new_process"):
current_strings.append(current_str.copy())
else:
loop_digit(current_str, place - 1, strings, hashes)
if place == config.getint("string_creation", "length_for_new_process"):
args = list()
print("Starting a new pool")
for string in current_strings:
args.append([string, place - 1, strings, hashes])
with multiprocessing.Pool(processes=config.getint("string_creation", "processes")) as pool:
pool.map(mapped_loop_digit, args)
pool.close()
pool.join()
manager = multiprocessing.Manager()
all_strings = manager.list("")
all_hashes = manager.list("")
loop_digit(["", "", "", ""], 4 - 1, all_strings, all_hashes, is_outer=True)
config.ini
[development]
minor_logging = 1
pool_minor_logging = 1
[string_creation]
processes = 3
length_for_new_process = 3
目前我有一个名为 current_strings 的列表,并在程序中间附加到它,然后在最后循环遍历它并创建一个参数列表,然后将其映射到一个单独的函数,然后再次 运行 原始函数。有没有更简单的方法来做到这一点,所以我可以只追加到池而不是列表。
如果您将 Pool
创建为
pool = multiprocessing.Pool(5)
没有pool.close()
pool.join()
那么你可以在不同的地方(在不同的函数中)多次使用pool
。
如果您使用 map_async()
而不是 map()
那么您不必等待进程结束,您可以使用下一个 map_async()
和 [=17 添加更多进程=] 将一起管理所有进程。
您还可以使用 apply_async
将单个过程添加到现有 pool
。
因为map_async
和apply_async
不等待进程结束所以你必须在退出程序前使用wait()
控制它
it1 = pool.map_async(...)
it2 = pool.map_async(...)
it3 = pool.apply_async(...)
# ... code ...
it1.wait()
it2.wait()
it3.wait()
或者你必须在最后使用(两者)
pool.close()
pool.join()
如果您不使用它,那么程序可能会在进程完成之前退出并终止它们。
最小工作示例
import multiprocessing
import time
def fun(number):
for x in range(3):
time.sleep(.2)
print(number, 'loop:', x)
if __name__ == '__main__':
pool = multiprocessing.Pool(2)
print("map [1,2,3]")
it1 = pool.map_async(fun, [1,2,3])
print("map ['A', 'B', 'C']")
it2 = pool.map_async(fun, ['A', 'B', 'C'])
print("single work X")
it3 = pool.apply_async(fun, 'X')
print("single work Y")
it4 = pool.apply_async(fun, 'Y')
# wait for the end of processes
print('wait for the end of processes')
#it1.wait()
#it2.wait()
#it3.wait()
#it4.wait()
pool.close()
pool.join()
print('exit')
我有一个函数可以将一堆字符串加密为 md5,在其中,我有一个我创建的池。
Main.py
config = ConfigParser()
config.read("config.ini")
possibleCharacters = "abcd"
def mapped_loop_digit(args):
loop_digit(*args, is_pool=True)
def loop_digit(current_str, place, strings, hashes, is_outer=False, is_pool=False):
if place == config.getint("string_creation", "length_for_new_process"):
current_strings = list()
for character in possibleCharacters:
current_str[place] = character
if is_outer and config.getboolean("development", "minor_logging"):
print("Outer character maker at", possibleCharacters.index(character) + 1, "in", len(possibleCharacters))
elif is_pool and config.getboolean("development", "pool_minor_logging"):
print("Outest in pool character maker for process", multiprocessing.current_process()._identity[0],
"at", possibleCharacters.index(character) + 1, "in", len(possibleCharacters), "with character as",
str(character) + ". Current string is", current_str)
if place == 0:
string = "".join(_character for _character in current_str)
hashes.append(hashlib.md5(string.encode()).hexdigest())
strings.append(string)
elif place == config.getint("string_creation", "length_for_new_process"):
current_strings.append(current_str.copy())
else:
loop_digit(current_str, place - 1, strings, hashes)
if place == config.getint("string_creation", "length_for_new_process"):
args = list()
print("Starting a new pool")
for string in current_strings:
args.append([string, place - 1, strings, hashes])
with multiprocessing.Pool(processes=config.getint("string_creation", "processes")) as pool:
pool.map(mapped_loop_digit, args)
pool.close()
pool.join()
manager = multiprocessing.Manager()
all_strings = manager.list("")
all_hashes = manager.list("")
loop_digit(["", "", "", ""], 4 - 1, all_strings, all_hashes, is_outer=True)
config.ini
[development]
minor_logging = 1
pool_minor_logging = 1
[string_creation]
processes = 3
length_for_new_process = 3
目前我有一个名为 current_strings 的列表,并在程序中间附加到它,然后在最后循环遍历它并创建一个参数列表,然后将其映射到一个单独的函数,然后再次 运行 原始函数。有没有更简单的方法来做到这一点,所以我可以只追加到池而不是列表。
如果您将 Pool
创建为
pool = multiprocessing.Pool(5)
没有pool.close()
pool.join()
那么你可以在不同的地方(在不同的函数中)多次使用pool
。
如果您使用 map_async()
而不是 map()
那么您不必等待进程结束,您可以使用下一个 map_async()
和 [=17 添加更多进程=] 将一起管理所有进程。
您还可以使用 apply_async
将单个过程添加到现有 pool
。
因为map_async
和apply_async
不等待进程结束所以你必须在退出程序前使用wait()
控制它
it1 = pool.map_async(...)
it2 = pool.map_async(...)
it3 = pool.apply_async(...)
# ... code ...
it1.wait()
it2.wait()
it3.wait()
或者你必须在最后使用(两者)
pool.close()
pool.join()
如果您不使用它,那么程序可能会在进程完成之前退出并终止它们。
最小工作示例
import multiprocessing
import time
def fun(number):
for x in range(3):
time.sleep(.2)
print(number, 'loop:', x)
if __name__ == '__main__':
pool = multiprocessing.Pool(2)
print("map [1,2,3]")
it1 = pool.map_async(fun, [1,2,3])
print("map ['A', 'B', 'C']")
it2 = pool.map_async(fun, ['A', 'B', 'C'])
print("single work X")
it3 = pool.apply_async(fun, 'X')
print("single work Y")
it4 = pool.apply_async(fun, 'Y')
# wait for the end of processes
print('wait for the end of processes')
#it1.wait()
#it2.wait()
#it3.wait()
#it4.wait()
pool.close()
pool.join()
print('exit')