PHP AMQP 延迟队列的实现
Implementation of delayed queue for PHP AMQP
最近快速实现了生产者/消费者队列系统
<?php
namespace Queue;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Amqp
{
private $connection;
private $queueName;
private $delayedQueueName;
private $channel;
private $callback;
public function __construct($host, $port, $login, $password, $queueName)
{
$this->connection = new AMQPStreamConnection($host, $port, $login, $password);
$this->queueName = $queueName;
$this->delayedQueueName = null;
$this->channel = $this->connection->channel();
// First, we need to make sure that RabbitMQ will never lose our queue.
// In order to do so, we need to declare it as durable. To do so we pass
// the third parameter to queue_declare as true.
$this->channel->queue_declare($queueName, false, true, false, false);
}
public function __destruct()
{
$this->close();
}
// Just in case :
// We should call close explicitly if possible.
public function close()
{
if (!is_null($this->channel)) {
$this->channel->close();
$this->channel = null;
}
if (!is_null($this->connection)) {
$this->connection->close();
$this->connection = null;
}
}
public function produceWithDelay($data, $delay)
{
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
// First, we need to make sure that RabbitMQ will never lose our queue.
// In order to do so, we need to declare it as durable. To do so we pass
// the third parameter to queue_declare as true.
$this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
new AMQPTable(array(
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $this->queueName
))
);
$this->delayedQueueName = $delayedQueueName;
}
$msg = new AMQPMessage(
$data,
array(
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => $delay
)
);
$this->channel->basic_publish($msg, '', $this->delayedQueueName);
}
public function produce($data)
{
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$this->channel->basic_publish($msg, '', $this->queueName);
}
public function consume($callback)
{
$this->callback = $callback;
// This tells RabbitMQ not to give more than one message to a worker at
// a time.
$this->channel->basic_qos(null, 1, null);
// Requires ack.
$this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback'));
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function consumeCallback($msg)
{
call_user_func_array(
$this->callback,
array($msg)
);
// Very important to ack, in order to remove msg from queue. Ack after
// callback, as exception might happen in callback.
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
public function getQueueSize()
{
// three tuple containing (<queue name>, <message count>, <consumer count>)
$tuple = $this->channel->queue_declare($this->queueName, false, true, false, false);
if ($tuple != null && isset($tuple[1])) {
return $tuple[1];
}
return -1;
}
}
public function produce
和 public function consume
对按预期工作。
但是,当它自带延迟队列系统时
public function produceWithDelay
和 public function consume
对没有按预期工作。调用consume
的消费者,甚至等待了一段时间也没有收到任何物品。
我认为我的 produceWithDelay
实施有些不对劲。我可以知道这是怎么回事吗?
首先验证您的插件 rabbitmq_delayed_message_exchange
是否由 运行 命令启用:rabbitmq-plugins list
,如果没有 - 阅读更多信息 here。
并且您必须更新 __construct
方法,因为您需要以另一种方式声明队列。我不会假装更新您的构造,但想提供我的简单示例:
声明队列:
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$args = new AMQPTable(['x-delayed-type' => 'fanout']);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']);
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args);
$channel->queue_bind('delayed_queue', 'delayed_exchange');
发送消息:
$data = 'Hello World at ' . date('Y-m-d H:i:s');
$delay = 7000;
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$headers = new AMQPTable(['x-delay' => $delay]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, 'delayed_exchange');
printf(' [x] Message sent: %s %s', $data, PHP_EOL);
$channel->close();
$connection->close();
收到消息:
$callback = function (AMQPMessage $message) {
printf(' [x] Message received: %s %s', $message->body, PHP_EOL);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
您还可以找到源文件here。
希望对您有所帮助!
旁注。
我发现这是我自己的bug造成的。
而不是
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
$this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
...
$this->delayedQueueName = $delayedQueueName;
}
我应该写在
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
$this->channel->queue_declare(delayedQueueName, false, true, false, false, false,
...
$this->delayedQueueName = $delayedQueueName;
}
我的成员变量还没有正确初始化。
完整的代码如下,供大家参考。
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Amqp
{
private $connection;
private $queueName;
private $delayedQueueName;
private $channel;
private $callback;
public function __construct($host, $port, $login, $password, $queueName)
{
$this->connection = new AMQPStreamConnection($host, $port, $login, $password);
$this->queueName = $queueName;
$this->delayedQueueName = null;
$this->channel = $this->connection->channel();
$this->channel->queue_declare($queueName, false, true, false, false);
}
public function __destruct()
{
$this->close();
}
public function close()
{
if (!is_null($this->channel)) {
$this->channel->close();
$this->channel = null;
}
if (!is_null($this->connection)) {
$this->connection->close();
$this->connection = null;
}
}
public function produceWithDelay($data, $delay)
{
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
$this->channel->queue_declare($delayedQueueName, false, true, false, false, false,
new AMQPTable(array(
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $this->queueName
))
);
$this->delayedQueueName = $delayedQueueName;
}
$msg = new AMQPMessage(
$data,
array(
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => $delay
)
);
$this->channel->basic_publish($msg, '', $this->delayedQueueName);
}
public function produce($data)
{
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$this->channel->basic_publish($msg, '', $this->queueName);
}
public function consume($callback)
{
$this->callback = $callback;
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback'));
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function callback($msg)
{
call_user_func_array(
$this->callback,
array($msg)
);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
}
如果您选择 Queue Interop,只需几行代码就可以使延迟消息正常工作。有基于ttl加dead letter exchange方式的解决方案,还有延迟插件。
https://blog.forma-pro.com/rabbitmq-delayed-messaging-da802e3a0aa9
最近快速实现了生产者/消费者队列系统
<?php
namespace Queue;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Amqp
{
private $connection;
private $queueName;
private $delayedQueueName;
private $channel;
private $callback;
public function __construct($host, $port, $login, $password, $queueName)
{
$this->connection = new AMQPStreamConnection($host, $port, $login, $password);
$this->queueName = $queueName;
$this->delayedQueueName = null;
$this->channel = $this->connection->channel();
// First, we need to make sure that RabbitMQ will never lose our queue.
// In order to do so, we need to declare it as durable. To do so we pass
// the third parameter to queue_declare as true.
$this->channel->queue_declare($queueName, false, true, false, false);
}
public function __destruct()
{
$this->close();
}
// Just in case :
// We should call close explicitly if possible.
public function close()
{
if (!is_null($this->channel)) {
$this->channel->close();
$this->channel = null;
}
if (!is_null($this->connection)) {
$this->connection->close();
$this->connection = null;
}
}
public function produceWithDelay($data, $delay)
{
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
// First, we need to make sure that RabbitMQ will never lose our queue.
// In order to do so, we need to declare it as durable. To do so we pass
// the third parameter to queue_declare as true.
$this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
new AMQPTable(array(
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $this->queueName
))
);
$this->delayedQueueName = $delayedQueueName;
}
$msg = new AMQPMessage(
$data,
array(
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => $delay
)
);
$this->channel->basic_publish($msg, '', $this->delayedQueueName);
}
public function produce($data)
{
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$this->channel->basic_publish($msg, '', $this->queueName);
}
public function consume($callback)
{
$this->callback = $callback;
// This tells RabbitMQ not to give more than one message to a worker at
// a time.
$this->channel->basic_qos(null, 1, null);
// Requires ack.
$this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback'));
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function consumeCallback($msg)
{
call_user_func_array(
$this->callback,
array($msg)
);
// Very important to ack, in order to remove msg from queue. Ack after
// callback, as exception might happen in callback.
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
public function getQueueSize()
{
// three tuple containing (<queue name>, <message count>, <consumer count>)
$tuple = $this->channel->queue_declare($this->queueName, false, true, false, false);
if ($tuple != null && isset($tuple[1])) {
return $tuple[1];
}
return -1;
}
}
public function produce
和 public function consume
对按预期工作。
但是,当它自带延迟队列系统时
public function produceWithDelay
和 public function consume
对没有按预期工作。调用consume
的消费者,甚至等待了一段时间也没有收到任何物品。
我认为我的 produceWithDelay
实施有些不对劲。我可以知道这是怎么回事吗?
首先验证您的插件 rabbitmq_delayed_message_exchange
是否由 运行 命令启用:rabbitmq-plugins list
,如果没有 - 阅读更多信息 here。
并且您必须更新 __construct
方法,因为您需要以另一种方式声明队列。我不会假装更新您的构造,但想提供我的简单示例:
声明队列:
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$args = new AMQPTable(['x-delayed-type' => 'fanout']);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']);
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args);
$channel->queue_bind('delayed_queue', 'delayed_exchange');
发送消息:
$data = 'Hello World at ' . date('Y-m-d H:i:s');
$delay = 7000;
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$headers = new AMQPTable(['x-delay' => $delay]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, 'delayed_exchange');
printf(' [x] Message sent: %s %s', $data, PHP_EOL);
$channel->close();
$connection->close();
收到消息:
$callback = function (AMQPMessage $message) {
printf(' [x] Message received: %s %s', $message->body, PHP_EOL);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
您还可以找到源文件here。
希望对您有所帮助!
旁注。
我发现这是我自己的bug造成的。
而不是
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
$this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
...
$this->delayedQueueName = $delayedQueueName;
}
我应该写在
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
$this->channel->queue_declare(delayedQueueName, false, true, false, false, false,
...
$this->delayedQueueName = $delayedQueueName;
}
我的成员变量还没有正确初始化。
完整的代码如下,供大家参考。
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Amqp
{
private $connection;
private $queueName;
private $delayedQueueName;
private $channel;
private $callback;
public function __construct($host, $port, $login, $password, $queueName)
{
$this->connection = new AMQPStreamConnection($host, $port, $login, $password);
$this->queueName = $queueName;
$this->delayedQueueName = null;
$this->channel = $this->connection->channel();
$this->channel->queue_declare($queueName, false, true, false, false);
}
public function __destruct()
{
$this->close();
}
public function close()
{
if (!is_null($this->channel)) {
$this->channel->close();
$this->channel = null;
}
if (!is_null($this->connection)) {
$this->connection->close();
$this->connection = null;
}
}
public function produceWithDelay($data, $delay)
{
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
$this->channel->queue_declare($delayedQueueName, false, true, false, false, false,
new AMQPTable(array(
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $this->queueName
))
);
$this->delayedQueueName = $delayedQueueName;
}
$msg = new AMQPMessage(
$data,
array(
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => $delay
)
);
$this->channel->basic_publish($msg, '', $this->delayedQueueName);
}
public function produce($data)
{
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$this->channel->basic_publish($msg, '', $this->queueName);
}
public function consume($callback)
{
$this->callback = $callback;
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback'));
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function callback($msg)
{
call_user_func_array(
$this->callback,
array($msg)
);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
}
如果您选择 Queue Interop,只需几行代码就可以使延迟消息正常工作。有基于ttl加dead letter exchange方式的解决方案,还有延迟插件。
https://blog.forma-pro.com/rabbitmq-delayed-messaging-da802e3a0aa9