Python - 来自 RabbitMQ 消费者的异步多处理

Python - Asynch Multiprocessing from RabbitMQ Consumer

我有一个 Python 程序充当 RabbitMQ 的消费者。一旦它从队列中收到作业,我希望程序使用多处理将作业拆分,但我 运行 遇到多处理后勤问题。

我简化了代码以提高可读性。

我的 RabbitMQ 消费者功能:

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue="JobReader", durable=True)
logging.info('Waiting for messages..')


def callback(ch, method, properties, body):
    job_info = json.loads(body)

    logging.info('Start Time: ' + time.strftime("%H:%M:%S"))

    split_jobs = split_job(job_info)

    process_manager.runProcesses(split_jobs)

    ch.basic_ack(delivery_tag=method.delivery_tag)

我的多处理功能:

#!/usr/bin/python

import multiprocessing
import other_package


def worker_process(sub_job):
    other_package.run_job(sub_job)


def runProcesses(jobs):
    processes = []
    for sub_job in jobs:
        p = multiprocessing.Process(target=worker_process, args=(sub_job,))
        processes.append(p)

        p.start()

当然,我做不到if __name__ == '__main__':,因为它在一个函数中。

我不确定多处理是否有解决此问题的方法,或者我是否只是以错误的方式处理此问题。任何帮助将不胜感激。

您可以重构 multiprocessing 部分,以便从主脚本初始化其状态:

import process_manager
...

def callback(ch, method, properties, body):
    job_info = json.loads(body)
    logging.info('Start Time: ' + time.strftime("%H:%M:%S"))
    split_jobs = split_job(job_info)
    manager.runProcesses(split_jobs)
    ch.basic_ack(delivery_tag=method.delivery_tag)


if __name__ == "__main__":
    manager = process_manager.get_manager()
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.queue_declare(queue="JobReader", durable=True)
    logging.info('Waiting for messages..')

然后 process_manager 看起来像这样:

import multiprocessing
import other_package

def worker_process(sub_job):
    other_package.run_job(sub_job)

_manager = None

def get_manager(): # Note that you don't have to use a singleton here
    global _manager
    if not _manager:
        _manager = Manager()
    return _manager


class Manager(object):
    def __init__(self):
        self._pool = multiprocessing.Pool()

    def runProcesses(self, jobs):
        self._pool.map_async(worker_process, jobs)

请注意,我使用 Pool 而不是为每个作业生成 Process,因为这可能无法很好地扩展。