通过 multiprocessing 的多个消费者 Rabbitmq
Multiple consumer Rabbitmq through multiprocessing
python 的新手。
我正在尝试为 RabbitMQ 客户端创建多个消费者。
我正在使用 PIKA 并尝试进行多处理。
它似乎连接但无法维持循环。
你能帮忙吗?
该部分代码还应通过回调处理编写器选项。
它应该开始循环并且应该总是消耗
import multiprocessing
import time
import pika
# this is the writer part
def callback(ch, method, properties, body):
print (" [x] %r received %r" % (multiprocessing.current_process(), body,))
time.sleep(body.count('.'))
# print " [x] Done"
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume():
credentials = pika.PlainCredentials(userid, password)
parameters = pika.ConnectionParameters(url, port, '/', credentials)
connection = pika.BlockingConnection(
parameters=parameters)
channel = connection.channel()
channel.queue_declare(queue='queuename', durable=True)
channel.basic_consume('queuename',callback)
print (' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
userid = "user"
password = "pwd"
url = "localhost"
port = 5672
if __name__ == "__main__":
workers = 5
pool = multiprocessing.Pool(processes=workers)
for i in range(0, workers):
pool.apply_async(consume)
#Stay alive
try:
while True:
您没有在您的 sub-processes 中进行任何异常处理,所以我的猜测是抛出了您不期望的异常。 This code 在我的环境中工作正常,使用 Pika 1.1.0 和 Python 3.7.3.
在我检查 body.count()
中的异常之前,会抛出 TypeError
,因为在那种情况下 body
不是 str
。
请注意,根据 these docs。
,我正在使用正确的方法等待 sub-processes
注意: RabbitMQ 团队监控 rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。
python 的新手。 我正在尝试为 RabbitMQ 客户端创建多个消费者。 我正在使用 PIKA 并尝试进行多处理。 它似乎连接但无法维持循环。 你能帮忙吗? 该部分代码还应通过回调处理编写器选项。
它应该开始循环并且应该总是消耗
import multiprocessing
import time
import pika
# this is the writer part
def callback(ch, method, properties, body):
print (" [x] %r received %r" % (multiprocessing.current_process(), body,))
time.sleep(body.count('.'))
# print " [x] Done"
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume():
credentials = pika.PlainCredentials(userid, password)
parameters = pika.ConnectionParameters(url, port, '/', credentials)
connection = pika.BlockingConnection(
parameters=parameters)
channel = connection.channel()
channel.queue_declare(queue='queuename', durable=True)
channel.basic_consume('queuename',callback)
print (' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
userid = "user"
password = "pwd"
url = "localhost"
port = 5672
if __name__ == "__main__":
workers = 5
pool = multiprocessing.Pool(processes=workers)
for i in range(0, workers):
pool.apply_async(consume)
#Stay alive
try:
while True:
您没有在您的 sub-processes 中进行任何异常处理,所以我的猜测是抛出了您不期望的异常。 This code 在我的环境中工作正常,使用 Pika 1.1.0 和 Python 3.7.3.
在我检查 body.count()
中的异常之前,会抛出 TypeError
,因为在那种情况下 body
不是 str
。
请注意,根据 these docs。
,我正在使用正确的方法等待 sub-processes注意: RabbitMQ 团队监控 rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。