RabbitMQ 消费者连接在空闲 90 秒后终止
RabbitMQ consumer connection dies after 90 seconds idle
我有一个 RabbitMQ 任务队列和一个 Pika 消费者来消费这些任务(带有 acks)。问题是连接在空闲 90 秒后终止,但我的任务通常需要比这更长的时间。这意味着当任务仍在计算时,它们将返回到任务队列并且永远不会被确认。
使用 RabbitMQ 3.5.3 和 Pika 0.9.14 以及 channel.basic_consume() 方法。连接的 heartbeat_interval 为 30 秒。
消费代码:
import pika
from time import sleep
RABBITMQ_URL = "amqp://user:pass@my-host.com/my_virtual_host?heartbeat_interval=30"
QUEUE_NAME = "my_queue"
def callback(ch, method, properties, body):
print body
sleep(91) # if sleep value < 90 this code works (even 89)
ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=QUEUE_NAME)
channel.start_consuming()
回溯:
Traceback (most recent call last):
File "main.py", line 19, in <module>
channel.basic_consume(callback, queue=QUEUE_NAME)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 221, in basic_consume
{'consumer_tag': consumer_tag})])
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1143, in _rpc
self.connection.process_data_events()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 240, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 347, in _handle_read
if self._read_poller.ready():
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 43, in inner
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 89, in ready
self.poll_timeout)
select.error: (9, 'Bad file descriptor')
这里的问题是因为你休眠时间太长,导致pika无法响应RabbitMQ的心跳请求,当这种情况发生时,RabbitMQ会关闭连接。
解决此问题的唯一方法是禁用心跳或以更小的间隔休眠并且 运行 process_data_events()
连续休眠,以便鼠兔可以处理心跳。
例如像这样
def amqp_sleep(connection, time_to_sleep=20):
remaining = time_to_sleep
while remaining > 0:
connection.process_data_events()
time.sleep(5)
remaining -= 5
但就个人而言,我会选择一个在后台自动处理心跳的库,这样您就不必处理它们,例如rabbitpy or my own amqp-storm.
我有一个 RabbitMQ 任务队列和一个 Pika 消费者来消费这些任务(带有 acks)。问题是连接在空闲 90 秒后终止,但我的任务通常需要比这更长的时间。这意味着当任务仍在计算时,它们将返回到任务队列并且永远不会被确认。
使用 RabbitMQ 3.5.3 和 Pika 0.9.14 以及 channel.basic_consume() 方法。连接的 heartbeat_interval 为 30 秒。
消费代码:
import pika
from time import sleep
RABBITMQ_URL = "amqp://user:pass@my-host.com/my_virtual_host?heartbeat_interval=30"
QUEUE_NAME = "my_queue"
def callback(ch, method, properties, body):
print body
sleep(91) # if sleep value < 90 this code works (even 89)
ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=QUEUE_NAME)
channel.start_consuming()
回溯:
Traceback (most recent call last):
File "main.py", line 19, in <module>
channel.basic_consume(callback, queue=QUEUE_NAME)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 221, in basic_consume
{'consumer_tag': consumer_tag})])
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1143, in _rpc
self.connection.process_data_events()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 240, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 347, in _handle_read
if self._read_poller.ready():
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 43, in inner
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 89, in ready
self.poll_timeout)
select.error: (9, 'Bad file descriptor')
这里的问题是因为你休眠时间太长,导致pika无法响应RabbitMQ的心跳请求,当这种情况发生时,RabbitMQ会关闭连接。
解决此问题的唯一方法是禁用心跳或以更小的间隔休眠并且 运行 process_data_events()
连续休眠,以便鼠兔可以处理心跳。
例如像这样
def amqp_sleep(connection, time_to_sleep=20):
remaining = time_to_sleep
while remaining > 0:
connection.process_data_events()
time.sleep(5)
remaining -= 5
但就个人而言,我会选择一个在后台自动处理心跳的库,这样您就不必处理它们,例如rabbitpy or my own amqp-storm.