python 具有多个进程的 input() 块

python input() blocks with multiple processes

我正在尝试使用多处理来执行多个后台作业,并将主进程用作通过 input() 接受命令的用户界面。每个进程都必须完成某些工作,并将其当前状态写入一个字典,该字典是用 manager.dict() 创建的,然后传递给进程。

创建进程后,有一个循环 input() 用于访问用户命令。为简单起见,此处将命令减至最少。

from multiprocessing import Manager
from multiprocessing import Process
with Manager() as manager:
    producers = []
    settings = [{'name':'test'}]

    for setting in settings:
        status = manager.dict()
        logger.info("Start Producer {0}".format(setting['name']))
        producer = Process(target=start_producer, args=(setting, status))
        producer.start()
        producers.append([producer, status])

    logger.info("initialized {0} producers".format(len(producers)))

    while True:
        text_command = input('Enter your command:')
        if text_command == 'exit':
            logger.info("waiting for producers")
            for p in producers:
                p[0].join()
            logger.info("Exit application.")
            break
        elif text_command == 'status':
            for p in producers:
                if 'name' in p[1] and 'status' in p[1]:
                    print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
        else:
            print("Unknown command.")

其他进程运行的方法很简单:

def start_producer(producer_setting: dict, status_dict: dict):
    importer = MyProducer(producer_setting)
    importer.set_status_dict(status_dict)
    importer.run()

我创建一个 MyProducer 实例并通过对象的 setter 设置状态字典并调用阻塞 run() 方法,它只会在生产者完成时 return .在调用 set_status_dict(status_dict) 时,字典中填充了 namestatus 元素。

当我 运行 代码时,生产者似乎已创建,我收到 "Start Producer test" 和 "initialized 1 producers" 输出,然后收到来自 "Enter your command" 的请求=13=],但是实际过程好像没有运行。

当我按回车键跳过第一个循环迭代时,我得到了预期的 "unknown command" 日志,生产者进程开始实际工作。之后我的 "status" 命令也按预期工作。

当我在第一次循环迭代中输入 'status' 时,我得到一个键错误,因为字典中没有设置 'name' 和 'status'。这些键应该在 set_status_dict() 中设置,它本身在 Process(target=...) 中被调用。

这是为什么?不应该 producer.start() 运行 新进程中 start_producer 的完整块,因此永远不会挂在主进程的 input() 上吗?

如何在没有任何用户输入的情况下先启动进程,然后才等待 input()

编辑: 可以在此处找到具有此问题的完整 mvce 程序:https://pastebin.com/k8xvhLhn

编辑: 已找到初始化进程后 sleep(1) 的解决方案。但为什么首先会发生这种行为呢? 运行 start_producer() 运行 中的所有代码不应该在新进程中吗?

我对多处理模块的经验有限,但我能够让它按照(我认为)您想要的方式运行。首先,我在 while 循环的顶部添加了一些打印语句,以查看可能发生的情况,并发现如果进程是 runjoined,它就会工作。我认为您不希望它阻塞,所以我在流程的后面添加了对 运行 的调用 - 但似乎 run() 也阻塞了。事实证明,当第一个 while 循环迭代出现时,这个过程还没有完成——在循环的顶部添加 time.sleep(30) 给了这个过程足够的时间来安排(由 OS)和 运行。 (在我的机器上它实际上只需要 200 到 300 毫秒的午睡时间)

我用 :

替换了 start_producer
def start_producer(producer_setting: dict, status_dict: dict):
##    importer = MyProducer(producer_setting)
##    importer.set_status_dict(status_dict)
##    importer.run()
    #time.sleep(30)
    status_dict['name'] = 'foo'
    status_dict['status'] = 'thinking'

您修改的代码:

if __name__ == '__main__':
    with Manager() as manager:
        producers = []
        settings = [{'name':'test'}]

        for setting in settings:
            status = manager.dict()
            logger.info("Start Producer {0}".format(setting['name']))
            producer = Process(target=start_producer, args=(setting, status))
            producer.start()
            # add a call to run() but it blocks
            #producer.run()
            producers.append([producer, status])

        logger.info("initialized {0} producers".format(len(producers)))

        while True:
            time.sleep(30)
            for p, s in producers:
                #p.join()
                #p.run()
                print(f'name:{p.name}|alive:{p.is_alive()}|{s}')
                if 'name' in s and 'status' in s:
                    print('{0}:{1}'.format(s['name'], s['status']))
            text_command = input('Enter your command:')
            if text_command == 'exit':
                logger.info("waiting for producers")
                for p in producers:
                    p[0].join()
                logger.info("Exit application.")
                break
            elif text_command == 'status':
                for p in producers:
                    if 'name' in p[1] and 'status' in p[1]:
                        print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
            else:
                print("Unknown command.")