python 的 Rabbitmq 工作意外
Rabbitmq with python works unexpected
我正在使用 rabbitmq 做一些练习。
但是rabbmitmq的行为和官网的教程不一样
worker
和task_sender
使用以下代码连接rabbitmq。
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')
并且task_sender
通过调用
发送任务
for i in range(10):
message = "job%s %d %s" % (str(random.randint(1,10)), i , '.'*i)
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
和worker
通过调用获取任务并等待一段时间。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Job Done!")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='task_queue',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
首先我运行task_sender.py
发了十份工作,很顺利。
但是当我在不同的 shell 中启动两个 worker.py
时,似乎只有一个工人正在获取任务而另一个工人什么都不做。
更重要的是,当工作worker
完成队列中的所有作业时,我运行task_sender.py
再次发送新任务,[=的None 22=] 不再获取工作。
rabbitmq 似乎正在阻塞,我该如何解决?
这是我的Rabbitmq status
欢迎任何帮助,提前致谢。
我不确定 pika,但在使用 oslo.messaging 和 rabbitmq 作为后端时我遇到了同样的问题。发生的事情是,通知消息的生产者发送了它,第一个接收消息的听众(消费者)消费了它,而其他听众从未收到它。
oslo 有一个功能 'fanout',它向所有服务器发送通知消息,而不是 'first one winners, others losers' 那种样式。
我想鼠兔大概也有这样的吧。
Oslo notification server documentation
尝试在此处搜索关键字 'fanout'。然后你可以检查 pika 文档是否有类似的东西。希望这有帮助。
在你的 worker 中,尝试将 prefetch_count 设置为合理的值并将 no_ack 设置为 false:
...
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue',
no_ack=False)
...
我正在使用 rabbitmq 做一些练习。
但是rabbmitmq的行为和官网的教程不一样
worker
和task_sender
使用以下代码连接rabbitmq。
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')
并且task_sender
通过调用
for i in range(10):
message = "job%s %d %s" % (str(random.randint(1,10)), i , '.'*i)
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
和worker
通过调用获取任务并等待一段时间。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Job Done!")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='task_queue',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
首先我运行task_sender.py
发了十份工作,很顺利。
但是当我在不同的 shell 中启动两个 worker.py
时,似乎只有一个工人正在获取任务而另一个工人什么都不做。
更重要的是,当工作worker
完成队列中的所有作业时,我运行task_sender.py
再次发送新任务,[=的None 22=] 不再获取工作。
rabbitmq 似乎正在阻塞,我该如何解决?
这是我的Rabbitmq status
欢迎任何帮助,提前致谢。
我不确定 pika,但在使用 oslo.messaging 和 rabbitmq 作为后端时我遇到了同样的问题。发生的事情是,通知消息的生产者发送了它,第一个接收消息的听众(消费者)消费了它,而其他听众从未收到它。
oslo 有一个功能 'fanout',它向所有服务器发送通知消息,而不是 'first one winners, others losers' 那种样式。
我想鼠兔大概也有这样的吧。
Oslo notification server documentation
尝试在此处搜索关键字 'fanout'。然后你可以检查 pika 文档是否有类似的东西。希望这有帮助。
在你的 worker 中,尝试将 prefetch_count 设置为合理的值并将 no_ack 设置为 false:
...
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue',
no_ack=False)
...