我如何使用 rabbitmq-delayed-message-exchange 插件在 rabbitmq 中发送延迟消息?
How could I send a delayed message in rabbitmq using the rabbitmq-delayed-message-exchange plugin?
我已经安装了从这里发送延迟消息的插件rabbitmq-delayed-message-exchange。
我在 python 中找不到任何使用它的帮助。我刚开始使用 rabbitmq 。
这是我一直在尝试的:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare("test-x", type="x-delayed-message", arguments={"x-delayed-type":"direct"})
channel.queue_declare(queue='task_queue',durable=True)
channel.queue_bind(queue="task_queue", exchange="test-x", routing_key="task_queue")
channel.basic_publish(exchange='test-x',routing_key='task_queue',body='Hello World! Delayed',arguments={"x-delay":100})
print(" [x] Sent 'Hello World! Delayed'")
connection.close()
以下是列出的交易所:
sudo rabbitmqctl list_exchanges
Listing exchanges ...
amq.direct direct
test-x x-delayed-message
amq.fanout fanout
amq.match headers
amq.headers headers
direct
amq.rabbitmq.trace topic
amq.topic topic
amq.rabbitmq.log topic
我不知道如何将延迟参数传递给 basic_publish 函数
感谢任何帮助
您需要将 x-delay
header 添加到您的消息属性中并以毫秒为单位指定延迟值。试试这个:
channel.basic_publish(
exchange='test-x',
routing_key='task_queue',
body='Hello World! Delayed',
properties=pika.BasicProperties(headers={"x-delay": 1000})
)
您实际上可以在不使用插件的情况下延迟消息。
Rabbit 队列中的消息可以通过两种方式延迟
- 使用队列 TTL
- 使用消息 TTL
如果队列中的所有消息要延迟固定时间,请使用队列 TTL。
如果每条消息都必须延迟不同的时间,请使用 Message TTL。
我已经使用 python3 和 pika 模块对其进行了解释。
pika BasicProperties 参数 'expiration' 以毫秒为单位必须设置为延迟队列中的延迟消息。
设置过期时间后,将消息发布到 delayed_queue ("not actual queue where consumers are waiting to consume") ,一旦 delayed_queue 中的消息过期,消息将使用 exchange 'amq.direct'[=11 路由到实际队列=]
def delay_publish(self, messages, queue, headers=None, expiration=0):
"""
Connect to RabbitMQ and publish messages to the queue
Args:
queue (string): queue name
messages (list or single item): messages to publish to rabbit queue
expiration(int): TTL in milliseconds for message
"""
delay_queue = "".join([queue, "_delay"])
logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue))
logging.info('Connecting to RabbitMQ: {host}'.format(
host=self.rabbit_host))
credentials = pika.PlainCredentials(
RABBIT_MQ_USER, RABBIT_MQ_PASS)
parameters = pika.ConnectionParameters(
rabbit_host, RABBIT_MQ_PORT,
RABBIT_MQ_VHOST, credentials, heartbeat_interval=0)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
channel.queue_bind(exchange='amq.direct',
queue=queue)
delay_channel = connection.channel()
delay_channel.queue_declare(queue=delay_queue, durable=True,
arguments={
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': queue
})
properties = pika.BasicProperties(
delivery_mode=2, headers=headers, expiration=str(expiration))
if type(messages) not in (list, tuple):
messages = [messages]
try:
for message in messages:
try:
json_data = json.dumps(message)
except Exception as err:
logging.error(
'Error Jsonify Payload: {err}, {payload}'.format(
err=err, payload=repr(message)), exc_info=True
)
if (type(message) is dict) and ('data' in message):
message['data'] = {}
message['error'] = 'Payload Invalid For JSON'
json_data = json.dumps(message)
else:
raise
try:
delay_channel.basic_publish(
exchange='', routing_key=delay_queue,
body=json_data, properties=properties)
except Exception as err:
logging.error(
'Error Publishing Data: {err}, {payload}'.format(
err=err, payload=json_data), exc_info=True
)
raise
except Exception:
raise
finally:
logging.info(
'Done Publishing. Closing Connection to {queue}'.format(
queue=delay_queue
)
)
connection.close()
我已经安装了从这里发送延迟消息的插件rabbitmq-delayed-message-exchange。
我在 python 中找不到任何使用它的帮助。我刚开始使用 rabbitmq 。
这是我一直在尝试的:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare("test-x", type="x-delayed-message", arguments={"x-delayed-type":"direct"})
channel.queue_declare(queue='task_queue',durable=True)
channel.queue_bind(queue="task_queue", exchange="test-x", routing_key="task_queue")
channel.basic_publish(exchange='test-x',routing_key='task_queue',body='Hello World! Delayed',arguments={"x-delay":100})
print(" [x] Sent 'Hello World! Delayed'")
connection.close()
以下是列出的交易所:
sudo rabbitmqctl list_exchanges
Listing exchanges ...
amq.direct direct
test-x x-delayed-message
amq.fanout fanout
amq.match headers
amq.headers headers
direct
amq.rabbitmq.trace topic
amq.topic topic
amq.rabbitmq.log topic
我不知道如何将延迟参数传递给 basic_publish 函数
感谢任何帮助
您需要将 x-delay
header 添加到您的消息属性中并以毫秒为单位指定延迟值。试试这个:
channel.basic_publish(
exchange='test-x',
routing_key='task_queue',
body='Hello World! Delayed',
properties=pika.BasicProperties(headers={"x-delay": 1000})
)
您实际上可以在不使用插件的情况下延迟消息。 Rabbit 队列中的消息可以通过两种方式延迟 - 使用队列 TTL - 使用消息 TTL 如果队列中的所有消息要延迟固定时间,请使用队列 TTL。 如果每条消息都必须延迟不同的时间,请使用 Message TTL。 我已经使用 python3 和 pika 模块对其进行了解释。 pika BasicProperties 参数 'expiration' 以毫秒为单位必须设置为延迟队列中的延迟消息。 设置过期时间后,将消息发布到 delayed_queue ("not actual queue where consumers are waiting to consume") ,一旦 delayed_queue 中的消息过期,消息将使用 exchange 'amq.direct'[=11 路由到实际队列=]
def delay_publish(self, messages, queue, headers=None, expiration=0):
"""
Connect to RabbitMQ and publish messages to the queue
Args:
queue (string): queue name
messages (list or single item): messages to publish to rabbit queue
expiration(int): TTL in milliseconds for message
"""
delay_queue = "".join([queue, "_delay"])
logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue))
logging.info('Connecting to RabbitMQ: {host}'.format(
host=self.rabbit_host))
credentials = pika.PlainCredentials(
RABBIT_MQ_USER, RABBIT_MQ_PASS)
parameters = pika.ConnectionParameters(
rabbit_host, RABBIT_MQ_PORT,
RABBIT_MQ_VHOST, credentials, heartbeat_interval=0)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
channel.queue_bind(exchange='amq.direct',
queue=queue)
delay_channel = connection.channel()
delay_channel.queue_declare(queue=delay_queue, durable=True,
arguments={
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': queue
})
properties = pika.BasicProperties(
delivery_mode=2, headers=headers, expiration=str(expiration))
if type(messages) not in (list, tuple):
messages = [messages]
try:
for message in messages:
try:
json_data = json.dumps(message)
except Exception as err:
logging.error(
'Error Jsonify Payload: {err}, {payload}'.format(
err=err, payload=repr(message)), exc_info=True
)
if (type(message) is dict) and ('data' in message):
message['data'] = {}
message['error'] = 'Payload Invalid For JSON'
json_data = json.dumps(message)
else:
raise
try:
delay_channel.basic_publish(
exchange='', routing_key=delay_queue,
body=json_data, properties=properties)
except Exception as err:
logging.error(
'Error Publishing Data: {err}, {payload}'.format(
err=err, payload=json_data), exc_info=True
)
raise
except Exception:
raise
finally:
logging.info(
'Done Publishing. Closing Connection to {queue}'.format(
queue=delay_queue
)
)
connection.close()