我如何使用 rabbitmq-delayed-message-exchange 插件在 rabbitmq 中发送延迟消息?

How could I send a delayed message in rabbitmq using the rabbitmq-delayed-message-exchange plugin?

我已经安装了从这里发送延迟消息的插件rabbitmq-delayed-message-exchange

我在 python 中找不到任何使用它的帮助。我刚开始使用 rabbitmq 。

这是我一直在尝试的:

import pika  
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare("test-x", type="x-delayed-message", arguments={"x-delayed-type":"direct"})  
channel.queue_declare(queue='task_queue',durable=True)
channel.queue_bind(queue="task_queue", exchange="test-x", routing_key="task_queue")
channel.basic_publish(exchange='test-x',routing_key='task_queue',body='Hello World! Delayed',arguments={"x-delay":100})
print(" [x] Sent 'Hello World! Delayed'")
connection.close()

以下是列出的交易所:

sudo rabbitmqctl list_exchanges
Listing exchanges ...
amq.direct  direct
test-x  x-delayed-message
amq.fanout  fanout
amq.match   headers
amq.headers headers
    direct
amq.rabbitmq.trace  topic
amq.topic   topic
amq.rabbitmq.log    topic

我不知道如何将延迟参数传递给 basic_publish 函数

感谢任何帮助

您需要将 x-delay header 添加到您的消息属性中并以毫秒为单位指定延迟值。试试这个:

channel.basic_publish(
    exchange='test-x',
    routing_key='task_queue',
    body='Hello World! Delayed',
    properties=pika.BasicProperties(headers={"x-delay": 1000})
)

您实际上可以在不使用插件的情况下延迟消息。 Rabbit 队列中的消息可以通过两种方式延迟 - 使用队列 TTL - 使用消息 TTL 如果队列中的所有消息要延迟固定时间,请使用队列 TTL。 如果每条消息都必须延迟不同的时间,请使用 Message TTL。 我已经使用 python3 和 pika 模块对其进行了解释。 pika BasicProperties 参数 'expiration' 以毫秒为单位必须设置为延迟队列中的延迟消息。 设置过期时间后,将消息发布到 delayed_queue ("not actual queue where consumers are waiting to consume") ,一旦 delayed_queue 中的消息过期,消息将使用 exchange 'amq.direct'[=11 路由到实际队列=]

def delay_publish(self, messages, queue, headers=None, expiration=0):
    """
    Connect to RabbitMQ and publish messages to the queue
    Args:
        queue (string): queue name
        messages (list or single item): messages to publish to rabbit queue
        expiration(int): TTL in milliseconds for message
    """
    delay_queue = "".join([queue, "_delay"])
    logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue))
    logging.info('Connecting to RabbitMQ: {host}'.format(
        host=self.rabbit_host))
    credentials = pika.PlainCredentials(
       RABBIT_MQ_USER, RABBIT_MQ_PASS)
    parameters = pika.ConnectionParameters(
       rabbit_host, RABBIT_MQ_PORT,
        RABBIT_MQ_VHOST, credentials, heartbeat_interval=0)
    connection = pika.BlockingConnection(parameters)

    channel = connection.channel()
    channel.queue_declare(queue=queue, durable=True)

    channel.queue_bind(exchange='amq.direct',
                       queue=queue)
    delay_channel = connection.channel()
    delay_channel.queue_declare(queue=delay_queue, durable=True,
                                arguments={
                                    'x-dead-letter-exchange': 'amq.direct',
                                    'x-dead-letter-routing-key': queue
                                })

    properties = pika.BasicProperties(
        delivery_mode=2, headers=headers, expiration=str(expiration))

    if type(messages) not in (list, tuple):
        messages = [messages]

    try:
        for message in messages:
            try:
                json_data = json.dumps(message)
            except Exception as err:
                logging.error(
                    'Error Jsonify Payload: {err}, {payload}'.format(
                        err=err, payload=repr(message)), exc_info=True
                )
                if (type(message) is dict) and ('data' in message):
                    message['data'] = {}
                    message['error'] = 'Payload Invalid For JSON'
                    json_data = json.dumps(message)
                else:
                    raise

            try:
                delay_channel.basic_publish(
                    exchange='', routing_key=delay_queue,
                    body=json_data, properties=properties)
            except Exception as err:
                logging.error(
                    'Error Publishing Data: {err}, {payload}'.format(
                        err=err, payload=json_data), exc_info=True
                )
                raise

    except Exception:
        raise

    finally:
        logging.info(
            'Done Publishing. Closing Connection to {queue}'.format(
                queue=delay_queue
            )
        )
        connection.close()