使用 Pika BlockingConnection 时,basic_ack() 是否必须放在回调函数中
when using Pika BlockingConnection, Does basic_ack() has to be placed in callback function
说我已经建立了如下所示的 RabbitMQ 连接:
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue=getting_from_this_queue)
channel.basic_consume(
callback, queue=getting_from_this_queue, no_ack=False)
channel.basic_qos( prefetch_count = 3 )
为了实现更好的并发性,我尝试将每个作业放在一个内部队列中,并创建了一个 while 循环来为从该内部队列中检索到的每个作业异步调度工作人员:
from Queue import Queue
from multiprocessing.dummy import Pool as ThreadPool
task_queue = Queue(10)
pool = Pool(20)
def worker(ch, method, job):
# ...some heavy lifting...
if job_gets_done: # some abstraction
print "job success"
ch.basic_ack(delivery_tag=method.delivery_tag) # PROBLEM : this seems not working
else:
print "job failed"
def callback(ch, method, properties, job):
task_queue.put((ch,method,dn)) # put job in internal queue, block if full.
@threaded
def async_process_jobs(): # loop to get job and start thread worker.
while True:
params = task_queue.get()
pool.apply_async( worker, params ) # param = (ch,method, job)
async_process_jobs()
channel.start_consuming()
问题是,当处理作业时,没有一个正确地发送确认(即使执行流真的通过它,即打印 "job success")。 rabbitmq 上的队列大小保持不变,为什么?
在 somewhat official tutorial 中,basic_ack() 被放置在 callback() 中,但我的没有。这可能是问题的根源吗?
详细行为(可能不重要):假设我在队列中有 10000 个作业,开始时,大约 2000 条消息进入 Unacked 状态,然后所有消息都回到就绪状态,即使我的工人仍在处理和打印 "job succes"(acking).
来自FAQ of pika:
Pika does not have any notion of threading in the code. If you want to
use Pika with threading, make sure you have a Pika connection per
thread, created in that thread. It is not safe to share one Pika
connection across threads.
我遇到了类似的问题,我注意到:
如果工作完成得很快,那么 ack 就可以工作
但是如果这个工作花费更多的时间,那么 ack 就不起作用了,即使它发出了。
说我已经建立了如下所示的 RabbitMQ 连接:
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue=getting_from_this_queue)
channel.basic_consume(
callback, queue=getting_from_this_queue, no_ack=False)
channel.basic_qos( prefetch_count = 3 )
为了实现更好的并发性,我尝试将每个作业放在一个内部队列中,并创建了一个 while 循环来为从该内部队列中检索到的每个作业异步调度工作人员:
from Queue import Queue
from multiprocessing.dummy import Pool as ThreadPool
task_queue = Queue(10)
pool = Pool(20)
def worker(ch, method, job):
# ...some heavy lifting...
if job_gets_done: # some abstraction
print "job success"
ch.basic_ack(delivery_tag=method.delivery_tag) # PROBLEM : this seems not working
else:
print "job failed"
def callback(ch, method, properties, job):
task_queue.put((ch,method,dn)) # put job in internal queue, block if full.
@threaded
def async_process_jobs(): # loop to get job and start thread worker.
while True:
params = task_queue.get()
pool.apply_async( worker, params ) # param = (ch,method, job)
async_process_jobs()
channel.start_consuming()
问题是,当处理作业时,没有一个正确地发送确认(即使执行流真的通过它,即打印 "job success")。 rabbitmq 上的队列大小保持不变,为什么?
在 somewhat official tutorial 中,basic_ack() 被放置在 callback() 中,但我的没有。这可能是问题的根源吗?
详细行为(可能不重要):假设我在队列中有 10000 个作业,开始时,大约 2000 条消息进入 Unacked 状态,然后所有消息都回到就绪状态,即使我的工人仍在处理和打印 "job succes"(acking).
来自FAQ of pika:
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads.
我遇到了类似的问题,我注意到: 如果工作完成得很快,那么 ack 就可以工作 但是如果这个工作花费更多的时间,那么 ack 就不起作用了,即使它发出了。