通过 multiprocessing 的多个消费者 Rabbitmq

Multiple consumer Rabbitmq through multiprocessing

python 的新手。 我正在尝试为 RabbitMQ 客户端创建多个消费者。 我正在使用 PIKA 并尝试进行多处理。 它似乎连接但无法维持循环。 你能帮忙吗? 该部分代码还应通过回调处理编写器选项。

它应该开始循环并且应该总是消耗

import multiprocessing
import time
import pika
# this is the writer part
def callback(ch, method, properties, body):
    print (" [x] %r received %r" % (multiprocessing.current_process(), body,))
    time.sleep(body.count('.'))
    # print " [x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume():
    credentials = pika.PlainCredentials(userid, password)
    parameters = pika.ConnectionParameters(url, port, '/', credentials)
    connection = pika.BlockingConnection(
        parameters=parameters)
    channel = connection.channel()
    channel.queue_declare(queue='queuename', durable=True)
    channel.basic_consume('queuename',callback)
    print (' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

userid = "user"
password = "pwd"
url = "localhost"
port = 5672

if __name__ == "__main__":

    workers = 5
    pool = multiprocessing.Pool(processes=workers)
    for i in range(0, workers):
        pool.apply_async(consume)

    #Stay alive
    try:
        while True:

您没有在您的 sub-processes 中进行任何异常处理,所以我的猜测是抛出了您不期望的异常。 This code 在我的环境中工作正常,使用 Pika 1.1.0 和 Python 3.7.3.

在我检查 body.count() 中的异常之前,会抛出 TypeError,因为在那种情况下 body 不是 str

请注意,根据 these docs

,我正在使用正确的方法等待 sub-processes

注意: RabbitMQ 团队监控 rabbitmq-users mailing list 并且有时只在 Whosebug 上回答问题。