以固定时间间隔消费消息/一次只消费一条消息
consume messages on fixed time interval / only one message at once
由于外部 API 的限制,我只想在固定的时间间隔内处理消息。一次只有一条消息。
要按顺序处理所有消息,我的代码如下所示:
$conn = new AMQPConnection($rbmqHOST, $rbmqPORT, $rbmqUSER, $rbmqPASS, $rbmqVHOST);
$ch = $conn->channel();
$ch->exchange_declare($exchange, 'direct', false, true, false);
$ch->queue_declare($queue, false, true, false, false);
$ch->queue_bind($queue, $exchange, $queue);
function process_message($msg)
{
//do something ..
//ack msg
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
$ch->basic_consume($queue, $queue, false, false, false, false, 'process_message');
function shutdown($ch, $conn)
{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
while (count($ch->callbacks)) {
$ch->wait();
}
在处理步骤中调用 sleep()
无效。
有什么建议吗?
好的,知道了。 (:
启发式的工作原理如下:
- 在所需的时间间隔内通过 cronjob 启动 php 消费者进程。
消息处理完毕后取消消费者。
...
function process_message($msg)
{
//do something ..
//ack msg
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//cancel consumer
$msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
}
...
启动一个消费者进程后,检查是否已经有另一个消费者进程运行。如果是这样,请中断以避免多个并行过程。只需检查 queue_declare()
:
中的 return 值
...
list(,,$consumerCount) = $ch->queue_declare($rbmqAmazonInit, false, true, false, false);
if ($consumerCount >= 1) {
//another consumer is already runing
exit;
}
...
由于外部 API 的限制,我只想在固定的时间间隔内处理消息。一次只有一条消息。
要按顺序处理所有消息,我的代码如下所示:
$conn = new AMQPConnection($rbmqHOST, $rbmqPORT, $rbmqUSER, $rbmqPASS, $rbmqVHOST);
$ch = $conn->channel();
$ch->exchange_declare($exchange, 'direct', false, true, false);
$ch->queue_declare($queue, false, true, false, false);
$ch->queue_bind($queue, $exchange, $queue);
function process_message($msg)
{
//do something ..
//ack msg
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
$ch->basic_consume($queue, $queue, false, false, false, false, 'process_message');
function shutdown($ch, $conn)
{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
while (count($ch->callbacks)) {
$ch->wait();
}
在处理步骤中调用 sleep()
无效。
有什么建议吗?
好的,知道了。 (:
启发式的工作原理如下:
- 在所需的时间间隔内通过 cronjob 启动 php 消费者进程。
消息处理完毕后取消消费者。
... function process_message($msg) { //do something .. //ack msg $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //cancel consumer $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']); } ...
启动一个消费者进程后,检查是否已经有另一个消费者进程运行。如果是这样,请中断以避免多个并行过程。只需检查
中的 return 值queue_declare()
:... list(,,$consumerCount) = $ch->queue_declare($rbmqAmazonInit, false, true, false, false); if ($consumerCount >= 1) { //another consumer is already runing exit; } ...