python 的 Rabbitmq 工作意外

Rabbitmq with python works unexpected

我正在使用 rabbitmq 做一些练习。

但是rabbmitmq的行为和官网的教程不一样

workertask_sender使用以下代码连接rabbitmq。

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')

并且task_sender通过调用

发送任务
for i in range(10):
    message = "job%s %d %s" % (str(random.randint(1,10)), i , '.'*i)
    channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

worker通过调用获取任务并等待一段时间。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Job Done!")
    ch.basic_ack(delivery_tag = method.delivery_tag)


channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

首先我运行task_sender.py发了十份工作,很顺利。

但是当我在不同的 shell 中启动两个 worker.py 时,似乎只有一个工人正在获取任务而另一个工人什么都不做。

更重要的是,当工作worker完成队列中的所有作业时,我运行task_sender.py再次发送新任务,[=的None 22=] 不再获取工作。

rabbitmq 似乎正在阻塞,我该如何解决?

这是我的Rabbitmq status

欢迎任何帮助,提前致谢。

我不确定 pika,但在使用 oslo.messaging 和 rabbitmq 作为后端时我遇到了同样的问题。发生的事情是,通知消息的生产者发送了它,第一个接收消息的听众(消费者)消费了它,而其他听众从未收到它。

oslo 有一个功能 'fanout',它向所有服务器发送通知消息,而不是 'first one winners, others losers' 那种样式。

我想鼠兔大概也有这样的吧。

Oslo notification server documentation

尝试在此处搜索关键字 'fanout'。然后你可以检查 pika 文档是否有类似的东西。希望这有帮助。

在你的 worker 中,尝试将 prefetch_count 设置为合理的值并将 no_ack 设置为 false:

...
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                  queue='task_queue',
                  no_ack=False)
...