python 具有多个进程的 input() 块
python input() blocks with multiple processes
我正在尝试使用多处理来执行多个后台作业,并将主进程用作通过 input()
接受命令的用户界面。每个进程都必须完成某些工作,并将其当前状态写入一个字典,该字典是用 manager.dict()
创建的,然后传递给进程。
创建进程后,有一个循环 input()
用于访问用户命令。为简单起见,此处将命令减至最少。
from multiprocessing import Manager
from multiprocessing import Process
with Manager() as manager:
producers = []
settings = [{'name':'test'}]
for setting in settings:
status = manager.dict()
logger.info("Start Producer {0}".format(setting['name']))
producer = Process(target=start_producer, args=(setting, status))
producer.start()
producers.append([producer, status])
logger.info("initialized {0} producers".format(len(producers)))
while True:
text_command = input('Enter your command:')
if text_command == 'exit':
logger.info("waiting for producers")
for p in producers:
p[0].join()
logger.info("Exit application.")
break
elif text_command == 'status':
for p in producers:
if 'name' in p[1] and 'status' in p[1]:
print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
else:
print("Unknown command.")
其他进程运行的方法很简单:
def start_producer(producer_setting: dict, status_dict: dict):
importer = MyProducer(producer_setting)
importer.set_status_dict(status_dict)
importer.run()
我创建一个 MyProducer 实例并通过对象的 setter 设置状态字典并调用阻塞 run()
方法,它只会在生产者完成时 return .在调用 set_status_dict(status_dict)
时,字典中填充了 name
和 status
元素。
当我 运行 代码时,生产者似乎已创建,我收到 "Start Producer test" 和 "initialized 1 producers" 输出,然后收到来自 "Enter your command" 的请求=13=],但是实际过程好像没有运行。
当我按回车键跳过第一个循环迭代时,我得到了预期的 "unknown command" 日志,生产者进程开始实际工作。之后我的 "status" 命令也按预期工作。
当我在第一次循环迭代中输入 'status' 时,我得到一个键错误,因为字典中没有设置 'name' 和 'status'。这些键应该在 set_status_dict()
中设置,它本身在 Process(target=...)
中被调用。
这是为什么?不应该 producer.start() 运行 新进程中 start_producer
的完整块,因此永远不会挂在主进程的 input()
上吗?
如何在没有任何用户输入的情况下先启动进程,然后才等待 input()
?
编辑: 可以在此处找到具有此问题的完整 mvce 程序:https://pastebin.com/k8xvhLhn
编辑: 已找到初始化进程后 sleep(1)
的解决方案。但为什么首先会发生这种行为呢? 运行 start_producer()
运行 中的所有代码不应该在新进程中吗?
我对多处理模块的经验有限,但我能够让它按照(我认为)您想要的方式运行。首先,我在 while 循环的顶部添加了一些打印语句,以查看可能发生的情况,并发现如果进程是 run
或 join
ed,它就会工作。我认为您不希望它阻塞,所以我在流程的后面添加了对 运行 的调用 - 但似乎 run()
也阻塞了。事实证明,当第一个 while 循环迭代出现时,这个过程还没有完成——在循环的顶部添加 time.sleep(30)
给了这个过程足够的时间来安排(由 OS)和 运行。 (在我的机器上它实际上只需要 200 到 300 毫秒的午睡时间)
我用 :
替换了 start_producer
def start_producer(producer_setting: dict, status_dict: dict):
## importer = MyProducer(producer_setting)
## importer.set_status_dict(status_dict)
## importer.run()
#time.sleep(30)
status_dict['name'] = 'foo'
status_dict['status'] = 'thinking'
您修改的代码:
if __name__ == '__main__':
with Manager() as manager:
producers = []
settings = [{'name':'test'}]
for setting in settings:
status = manager.dict()
logger.info("Start Producer {0}".format(setting['name']))
producer = Process(target=start_producer, args=(setting, status))
producer.start()
# add a call to run() but it blocks
#producer.run()
producers.append([producer, status])
logger.info("initialized {0} producers".format(len(producers)))
while True:
time.sleep(30)
for p, s in producers:
#p.join()
#p.run()
print(f'name:{p.name}|alive:{p.is_alive()}|{s}')
if 'name' in s and 'status' in s:
print('{0}:{1}'.format(s['name'], s['status']))
text_command = input('Enter your command:')
if text_command == 'exit':
logger.info("waiting for producers")
for p in producers:
p[0].join()
logger.info("Exit application.")
break
elif text_command == 'status':
for p in producers:
if 'name' in p[1] and 'status' in p[1]:
print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
else:
print("Unknown command.")
我正在尝试使用多处理来执行多个后台作业,并将主进程用作通过 input()
接受命令的用户界面。每个进程都必须完成某些工作,并将其当前状态写入一个字典,该字典是用 manager.dict()
创建的,然后传递给进程。
创建进程后,有一个循环 input()
用于访问用户命令。为简单起见,此处将命令减至最少。
from multiprocessing import Manager
from multiprocessing import Process
with Manager() as manager:
producers = []
settings = [{'name':'test'}]
for setting in settings:
status = manager.dict()
logger.info("Start Producer {0}".format(setting['name']))
producer = Process(target=start_producer, args=(setting, status))
producer.start()
producers.append([producer, status])
logger.info("initialized {0} producers".format(len(producers)))
while True:
text_command = input('Enter your command:')
if text_command == 'exit':
logger.info("waiting for producers")
for p in producers:
p[0].join()
logger.info("Exit application.")
break
elif text_command == 'status':
for p in producers:
if 'name' in p[1] and 'status' in p[1]:
print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
else:
print("Unknown command.")
其他进程运行的方法很简单:
def start_producer(producer_setting: dict, status_dict: dict):
importer = MyProducer(producer_setting)
importer.set_status_dict(status_dict)
importer.run()
我创建一个 MyProducer 实例并通过对象的 setter 设置状态字典并调用阻塞 run()
方法,它只会在生产者完成时 return .在调用 set_status_dict(status_dict)
时,字典中填充了 name
和 status
元素。
当我 运行 代码时,生产者似乎已创建,我收到 "Start Producer test" 和 "initialized 1 producers" 输出,然后收到来自 "Enter your command" 的请求=13=],但是实际过程好像没有运行。
当我按回车键跳过第一个循环迭代时,我得到了预期的 "unknown command" 日志,生产者进程开始实际工作。之后我的 "status" 命令也按预期工作。
当我在第一次循环迭代中输入 'status' 时,我得到一个键错误,因为字典中没有设置 'name' 和 'status'。这些键应该在 set_status_dict()
中设置,它本身在 Process(target=...)
中被调用。
这是为什么?不应该 producer.start() 运行 新进程中 start_producer
的完整块,因此永远不会挂在主进程的 input()
上吗?
如何在没有任何用户输入的情况下先启动进程,然后才等待 input()
?
编辑: 可以在此处找到具有此问题的完整 mvce 程序:https://pastebin.com/k8xvhLhn
编辑: 已找到初始化进程后 sleep(1)
的解决方案。但为什么首先会发生这种行为呢? 运行 start_producer()
运行 中的所有代码不应该在新进程中吗?
我对多处理模块的经验有限,但我能够让它按照(我认为)您想要的方式运行。首先,我在 while 循环的顶部添加了一些打印语句,以查看可能发生的情况,并发现如果进程是 run
或 join
ed,它就会工作。我认为您不希望它阻塞,所以我在流程的后面添加了对 运行 的调用 - 但似乎 run()
也阻塞了。事实证明,当第一个 while 循环迭代出现时,这个过程还没有完成——在循环的顶部添加 time.sleep(30)
给了这个过程足够的时间来安排(由 OS)和 运行。 (在我的机器上它实际上只需要 200 到 300 毫秒的午睡时间)
我用 :
替换了start_producer
def start_producer(producer_setting: dict, status_dict: dict):
## importer = MyProducer(producer_setting)
## importer.set_status_dict(status_dict)
## importer.run()
#time.sleep(30)
status_dict['name'] = 'foo'
status_dict['status'] = 'thinking'
您修改的代码:
if __name__ == '__main__':
with Manager() as manager:
producers = []
settings = [{'name':'test'}]
for setting in settings:
status = manager.dict()
logger.info("Start Producer {0}".format(setting['name']))
producer = Process(target=start_producer, args=(setting, status))
producer.start()
# add a call to run() but it blocks
#producer.run()
producers.append([producer, status])
logger.info("initialized {0} producers".format(len(producers)))
while True:
time.sleep(30)
for p, s in producers:
#p.join()
#p.run()
print(f'name:{p.name}|alive:{p.is_alive()}|{s}')
if 'name' in s and 'status' in s:
print('{0}:{1}'.format(s['name'], s['status']))
text_command = input('Enter your command:')
if text_command == 'exit':
logger.info("waiting for producers")
for p in producers:
p[0].join()
logger.info("Exit application.")
break
elif text_command == 'status':
for p in producers:
if 'name' in p[1] and 'status' in p[1]:
print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
else:
print("Unknown command.")