Python - Multirprocessing,给每个处理器一个来自文本文件的电子邮件
Python - Multirprocessing, give each processor an email from a text file
所以我一直在玩 Multiprocessing,我想提升我的知识,我可以从文本文件中读取进程 1 的第一句话,然后读取进程 2 的第二句,等等...
txt file:
helloworld@world.com
helloworld2@world.com
helloworld3@world.com
helloworld4@world.com
helloworld5@world.com
这就是代码的样子:
def info(thread):
global prod
prod = int(thread) + 1
runit()
def runit():
log("Profile-" + str(prod) + Fore.GREEN + ' - ' + email)
#From here I can then use the email for each worker basically. Or thats the plan atleast. Theplan is that every worker will have its own email that can be used in here.
sys.exit()
def main():
user_input = 0
while True:
try:
user_input = int(input(Fore.WHITE + 'How many tasks do you wanna run? [NUMBERS] \n' + Fore.RESET))
except ValueError:
print(Fore.RED + "Stop being stupid" + Fore.RESET)
continue
else:
with open('email.txt') as f:
content = f.readlines()
content = [x.strip('\n') for x in content]
try:
for i, email in enumerate(content):
print(email)
except ValueError as e:
print(e)
HowManyThread = user_input
i = 0
jobs = []
for i in range(HowManyThread):
p = multiprocessing.Process(target=info, args=(str(i),))
jobs.append(p)
time.sleep(.5)
p.start()
for p in jobs:
p.join()
sys.exit()
日志基本上就是一条日志消息,没什么特别的
Fore.COLOR <-- 色彩
但是,我完全不知道我应该怎么做才能真正让每个进程处理每个电子邮件行。所以基本上....
Process-1 to take helloworld@world.com
Process-2 to take helloworld2@world.com
Process-3 to take helloworld3@world.com
Process-4 to take helloworld4@world.com
Process-5 to take helloworld5@world.com
关于如何执行此操作的建议是什么?我完全离开了,完全不知道如何前进。
更新
from multiprocessing import pool, Process, Queue
from tqdm import tqdm
with open('email.txt') as f:
content = f.readlines()
global email_list
email_list = [x.strip('\n') for x in content]
def info(thread):
global prod
prod = int(thread) + 1
runit()
def runit(email_index):
email = email_list[email_index]
log("Profile-" + str(prod) + Fore.GREEN + ' - ' + email)
sys.exit()
def main():
wipe()
text()
Infotext = "First name : Last name : Email: : Random char + Street"
with open('data.json', 'w') as f:
json.dump(Infotext, f)
f.write("\n")
with Pool(8) as pool:
result_list = list(tqdm(pool.imap_unordered(, range(len(email_list)), chunksize=5), total=len(email_list))))
if __name__ == '__main__':
try:
main()
except Exception as e:
print(e)
print(traceback.print_exc())
print(traceback)
以下方法将多处理委托给一个工作池,每个工作池接收一大块索引并一次处理这些索引(poolsize=8
和 chunksize=5
的选择这里是任意的,可以根据你的要求进行调整。
然后将所有工人的结果收集到最终列表中。请注意 imap_unordered
仅适用于您不关心行处理顺序的情况(即 result_list
不保持 content
.
的原始顺序
from multiprocessing import Pool
# progress bar to track your multiproc
from tqdm import tqdm
with open('email.txt') as f:
content = f.readlines()
# this list will be accessed by each worker
global email_list
email_list = [x.strip('\n') for x in content]
# define function that worker will apply to each email
# it gets sent an index for the list of emails
# it accesses the email at that index, performs its function and returns
def runit(email_index):
email = email_list[email_index]
# do the stuff you're interested in for a single email
# run the multiprocessing to get your results
# this sends the indexes for the emails out to the workers
# and collects the results of runit into result list
with Pool(8) as pool:
result_list = list(tqdm(pool.imap_unordered(runit,
range(len(email_list)), chunksize=5),
total=len(email_list)))
您需要的是一个工作进程池 - 即使对于您的用例,我真的很想知道线程(或 multiprocessing.dummy)是否还不够。
池启动要求数量的工作进程,您可以将异步任务提交到将由第一个空闲工作处理的池。
您示例的精简版本(没有花哨的打印,没有不必要地读取列表中的顺序文件)可以是:
import multiprocessing
import time
def runit(prod, email):
print("Profile-" + str(prod) + ' - ' + email)
#From here I can then use the email for each worker basically. Or thats the plan atleast. Theplan is that every worker will have its own email that can be used in here.
# sys.exit() # NEVER CALL EXPLICITELY sys.exit() in a worker process
time.sleep(1) # to add a delay inside each task
def main():
while True:
try:
HowManyThread = int(input(
'How many tasks do you wanna run? [NUMBERS] \n'))
except ValueError:
print("Stop being stupid")
continue
if HowManyThread == 0: break
pool = multiprocessing.Pool(HowManyThread)
with open('email.txt') as f:
for i, email in enumerate(f):
email = email.strip()
# runit will be runned by a worker process
pool.apply_async(runit, (i, email))
pool.close() # no more task to add
pool.join() # wait for last worker to end
if __name__ == "__main__":
main()
所以我一直在玩 Multiprocessing,我想提升我的知识,我可以从文本文件中读取进程 1 的第一句话,然后读取进程 2 的第二句,等等...
txt file:
helloworld@world.com
helloworld2@world.com
helloworld3@world.com
helloworld4@world.com
helloworld5@world.com
这就是代码的样子:
def info(thread):
global prod
prod = int(thread) + 1
runit()
def runit():
log("Profile-" + str(prod) + Fore.GREEN + ' - ' + email)
#From here I can then use the email for each worker basically. Or thats the plan atleast. Theplan is that every worker will have its own email that can be used in here.
sys.exit()
def main():
user_input = 0
while True:
try:
user_input = int(input(Fore.WHITE + 'How many tasks do you wanna run? [NUMBERS] \n' + Fore.RESET))
except ValueError:
print(Fore.RED + "Stop being stupid" + Fore.RESET)
continue
else:
with open('email.txt') as f:
content = f.readlines()
content = [x.strip('\n') for x in content]
try:
for i, email in enumerate(content):
print(email)
except ValueError as e:
print(e)
HowManyThread = user_input
i = 0
jobs = []
for i in range(HowManyThread):
p = multiprocessing.Process(target=info, args=(str(i),))
jobs.append(p)
time.sleep(.5)
p.start()
for p in jobs:
p.join()
sys.exit()
日志基本上就是一条日志消息,没什么特别的
Fore.COLOR <-- 色彩
但是,我完全不知道我应该怎么做才能真正让每个进程处理每个电子邮件行。所以基本上....
Process-1 to take helloworld@world.com
Process-2 to take helloworld2@world.com
Process-3 to take helloworld3@world.com
Process-4 to take helloworld4@world.com
Process-5 to take helloworld5@world.com
关于如何执行此操作的建议是什么?我完全离开了,完全不知道如何前进。
更新
from multiprocessing import pool, Process, Queue
from tqdm import tqdm
with open('email.txt') as f:
content = f.readlines()
global email_list
email_list = [x.strip('\n') for x in content]
def info(thread):
global prod
prod = int(thread) + 1
runit()
def runit(email_index):
email = email_list[email_index]
log("Profile-" + str(prod) + Fore.GREEN + ' - ' + email)
sys.exit()
def main():
wipe()
text()
Infotext = "First name : Last name : Email: : Random char + Street"
with open('data.json', 'w') as f:
json.dump(Infotext, f)
f.write("\n")
with Pool(8) as pool:
result_list = list(tqdm(pool.imap_unordered(, range(len(email_list)), chunksize=5), total=len(email_list))))
if __name__ == '__main__':
try:
main()
except Exception as e:
print(e)
print(traceback.print_exc())
print(traceback)
以下方法将多处理委托给一个工作池,每个工作池接收一大块索引并一次处理这些索引(poolsize=8
和 chunksize=5
的选择这里是任意的,可以根据你的要求进行调整。
然后将所有工人的结果收集到最终列表中。请注意 imap_unordered
仅适用于您不关心行处理顺序的情况(即 result_list
不保持 content
.
from multiprocessing import Pool
# progress bar to track your multiproc
from tqdm import tqdm
with open('email.txt') as f:
content = f.readlines()
# this list will be accessed by each worker
global email_list
email_list = [x.strip('\n') for x in content]
# define function that worker will apply to each email
# it gets sent an index for the list of emails
# it accesses the email at that index, performs its function and returns
def runit(email_index):
email = email_list[email_index]
# do the stuff you're interested in for a single email
# run the multiprocessing to get your results
# this sends the indexes for the emails out to the workers
# and collects the results of runit into result list
with Pool(8) as pool:
result_list = list(tqdm(pool.imap_unordered(runit,
range(len(email_list)), chunksize=5),
total=len(email_list)))
您需要的是一个工作进程池 - 即使对于您的用例,我真的很想知道线程(或 multiprocessing.dummy)是否还不够。
池启动要求数量的工作进程,您可以将异步任务提交到将由第一个空闲工作处理的池。
您示例的精简版本(没有花哨的打印,没有不必要地读取列表中的顺序文件)可以是:
import multiprocessing
import time
def runit(prod, email):
print("Profile-" + str(prod) + ' - ' + email)
#From here I can then use the email for each worker basically. Or thats the plan atleast. Theplan is that every worker will have its own email that can be used in here.
# sys.exit() # NEVER CALL EXPLICITELY sys.exit() in a worker process
time.sleep(1) # to add a delay inside each task
def main():
while True:
try:
HowManyThread = int(input(
'How many tasks do you wanna run? [NUMBERS] \n'))
except ValueError:
print("Stop being stupid")
continue
if HowManyThread == 0: break
pool = multiprocessing.Pool(HowManyThread)
with open('email.txt') as f:
for i, email in enumerate(f):
email = email.strip()
# runit will be runned by a worker process
pool.apply_async(runit, (i, email))
pool.close() # no more task to add
pool.join() # wait for last worker to end
if __name__ == "__main__":
main()