PHP AMQP 延迟队列的实现

Implementation of delayed queue for PHP AMQP

最近快速实现了生产者/消费者队列系统

<?php
namespace Queue;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;    

class Amqp
{
    private $connection;
    private $queueName;
    private $delayedQueueName;
    private $channel;
    private $callback;

    public function __construct($host, $port, $login, $password, $queueName)
    {
        $this->connection = new AMQPStreamConnection($host, $port, $login, $password);
        $this->queueName = $queueName;
        $this->delayedQueueName = null;
        $this->channel = $this->connection->channel();
        // First, we need to make sure that RabbitMQ will never lose our queue.
        // In order to do so, we need to declare it as durable. To do so we pass
        // the third parameter to queue_declare as true.
        $this->channel->queue_declare($queueName, false, true, false, false);
    }

    public function __destruct()
    {
        $this->close();
    }

    // Just in case : 
    // We should call close explicitly if possible.
    public function close()
    {
        if (!is_null($this->channel)) {
            $this->channel->close();
            $this->channel = null;
        }

        if (!is_null($this->connection)) {
            $this->connection->close();
            $this->connection = null;
        }
    }

    public function produceWithDelay($data, $delay)
    {
        if (is_null($this->delayedQueueName))
        {
            $delayedQueueName = $this->queueName . '.delayed';

            // First, we need to make sure that RabbitMQ will never lose our queue.
            // In order to do so, we need to declare it as durable. To do so we pass
            // the third parameter to queue_declare as true.
            $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
                new AMQPTable(array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $this->queueName
                ))
            );

            $this->delayedQueueName = $delayedQueueName;
        }

        $msg = new AMQPMessage(
            $data,
            array(
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => $delay
            )
        );

        $this->channel->basic_publish($msg, '', $this->delayedQueueName);
    }

    public function produce($data)
    {
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );

        $this->channel->basic_publish($msg, '', $this->queueName);
    }

    public function consume($callback)
    {
        $this->callback = $callback;

        // This tells RabbitMQ not to give more than one message to a worker at
        // a time.
        $this->channel->basic_qos(null, 1, null);

        // Requires ack.
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback'));

        while(count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function consumeCallback($msg)
    {
        call_user_func_array(
            $this->callback,
            array($msg)
        );

        // Very important to ack, in order to remove msg from queue. Ack after
        // callback, as exception might happen in callback.
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }

    public function getQueueSize()
    {
        // three tuple containing (<queue name>, <message count>, <consumer count>)
        $tuple = $this->channel->queue_declare($this->queueName, false, true, false, false);
        if ($tuple != null && isset($tuple[1])) {
            return $tuple[1];
        }
        return -1;
    }
}

public function producepublic function consume 对按预期工作。

但是,当它自带延迟队列系统时

public function produceWithDelaypublic function consume 对没有按预期工作。调用consume的消费者,甚至等待了一段时间也没有收到任何物品。

我认为我的 produceWithDelay 实施有些不对劲。我可以知道这是怎么回事吗?

首先验证您的插件 rabbitmq_delayed_message_exchange 是否由 运行 命令启用:rabbitmq-plugins list,如果没有 - 阅读更多信息 here

并且您必须更新 __construct 方法,因为您需要以另一种方式声明队列。我不会假装更新您的构造,但想提供我的简单示例:

声明队列:

<?php

require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$args = new AMQPTable(['x-delayed-type' => 'fanout']);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']);
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args);
$channel->queue_bind('delayed_queue', 'delayed_exchange');

发送消息:

$data = 'Hello World at ' . date('Y-m-d H:i:s');
$delay = 7000;
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$headers = new AMQPTable(['x-delay' => $delay]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, 'delayed_exchange');
printf(' [x] Message sent: %s %s', $data, PHP_EOL);
$channel->close();
$connection->close();

收到消息:

$callback = function (AMQPMessage $message) {
    printf(' [x] Message received: %s %s', $message->body, PHP_EOL);
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

您还可以找到源文件here
希望对您有所帮助!

旁注。

我发现这是我自己的bug造成的。

而不是

    if (is_null($this->delayedQueueName))
    {
        $delayedQueueName = $this->queueName . '.delayed';

        $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
        ...

        $this->delayedQueueName = $delayedQueueName;
    }

我应该写在

    if (is_null($this->delayedQueueName))
    {
        $delayedQueueName = $this->queueName . '.delayed';

        $this->channel->queue_declare(delayedQueueName, false, true, false, false, false,
        ...

        $this->delayedQueueName = $delayedQueueName;
    }

我的成员变量还没有正确初始化。

完整的代码如下,供大家参考。

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class Amqp
{
    private $connection;
    private $queueName;
    private $delayedQueueName;
    private $channel;
    private $callback;

    public function __construct($host, $port, $login, $password, $queueName)
    {
        $this->connection = new AMQPStreamConnection($host, $port, $login, $password);
        $this->queueName = $queueName;
        $this->delayedQueueName = null;
        $this->channel = $this->connection->channel();
        $this->channel->queue_declare($queueName, false, true, false, false);
    }

    public function __destruct()
    {
        $this->close();
    }

    public function close()
    {
        if (!is_null($this->channel)) {
            $this->channel->close();
            $this->channel = null;
        }

        if (!is_null($this->connection)) {
            $this->connection->close();
            $this->connection = null;
        }
    }

    public function produceWithDelay($data, $delay)
    {
        if (is_null($this->delayedQueueName))
        {
            $delayedQueueName = $this->queueName . '.delayed';

            $this->channel->queue_declare($delayedQueueName, false, true, false, false, false,
                new AMQPTable(array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $this->queueName
                ))
            );

            $this->delayedQueueName = $delayedQueueName;
        }

        $msg = new AMQPMessage(
            $data,
            array(
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => $delay
            )
        );

        $this->channel->basic_publish($msg, '', $this->delayedQueueName);
    }

    public function produce($data)
    {
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );

        $this->channel->basic_publish($msg, '', $this->queueName);
    }

    public function consume($callback)
    {
        $this->callback = $callback;

        $this->channel->basic_qos(null, 1, null);

        $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback'));

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function callback($msg)
    {
        call_user_func_array(
            $this->callback,
            array($msg)
        );

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
}

如果您选择 Queue Interop,只需几行代码就可以使延迟消息正常工作。有基于ttl加dead letter exchange方式的解决方案,还有延迟插件。

https://blog.forma-pro.com/rabbitmq-delayed-messaging-da802e3a0aa9