如何使用多处理 运行 Python 中的多个异步进程?

How to run multiple asynchronous processes in Python using multiprocessing?

我需要 运行 多个后台异步函数,使用多处理。我有工作的 Popen 解决方案,但它看起来有点不自然。示例:

from time import sleep
from multiprocessing import Process, Value
import subprocess

def worker_email(keyword):
    subprocess.Popen(["python", "mongoworker.py", str(keyword)])
    return True

keywords_list = ['apple', 'banana', 'orange', 'strawberry']

if __name__ == '__main__':
    for keyword in keywords_list:
        # Do work
        p = Process(target=worker_email, args=(keyword,))
        p.start()
        p.join()

如果我尽量不使用 Popen,例如:

def worker_email(keyword):
    print('Before:' + keyword)
    sleep(10)
    print('After:' + keyword)
    return True

函数 运行 一个接一个,没有异步。那么,如何在不使用Popen的情况下运行同时实现所有功能呢?

UPD: 我正在使用 multiprocessing.Value 到 return 来自 Process 的结果,例如:

def worker_email(keyword, func_result):
    sleep(10)
    print('Yo:' + keyword)
    func_result.value = 1
    return True

func_result = Value('i', 0)
p = Process(target=worker_email, args=(doc['check_id'],func_result))
p.start()
# Change status
if func_result.value == 1:
    stream.update_one({'_id': doc['_id']}, {"$set": {"status": True}}, upsert=False)

但没有 .join() 就无法工作。任何想法如何使其工作或类似方式? :)

如果您只是删除行 p.join(),它应该可以工作。 如果你想在进一步执行之前等待进程完成,你只需要 p.join 。在程序结束时 python 在关闭之前等待所有进程完成,因此您无需担心。

解决了通过将结果检查和状态更新转移到辅助函数中来获取处理结果的问题。类似于:

# Update task status if work is done
def update_status(task_id, func_result):
    # Connect to DB
    client = MongoClient('mongodb://localhost:27017/')
    db = client.admetric
    stream = db.stream

    # Update task status if OK
    if func_result:
        stream.update_one({'_id': task_id}, {"$set": {"status": True}}, upsert=False)

    # Close DB connection
    client.close()

# Do work
def yo_func(keyword):
    sleep(10)
    print('Yo:' + keyword)
    return True

# Worker function
def worker_email(keyword, task_id):
    update_status(task_id, yo_func(keyword))