如何在 Laravel 队列中容纳 Amazon FIFO SQS?
How to accommodate Amazon FIFO SQS in Laravel queue?
Amazon 已经宣布了他们的 new FIFO SQS service,我想在 Laravel 队列中使用它来解决一些并发问题。
我创建了几个新队列并更改了配置。但是,我收到一个 MissingParameter
错误,上面写着
The request must contain the parameter MessageGroupId.
所以我修改了文件vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);
return $response->get('MessageId');
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
])->get('MessageId');
}
我正在使用 APP_ENV
作为组 ID(这是一个单一的消息队列,所以实际上它并不重要。我只希望一切都是先进先出)。
但我仍然收到相同的错误消息。我该如何解决?任何帮助将不胜感激。
(顺便说一句,SDK在哪里定义的sendMessage
?我可以找到它的存根,但我没有找到详细的实现)
除了 MessageGroupId
,它还需要 MessageDeduplicationId
或启用基于内容的重复数据删除。
我想向可能遇到同样问题的其他人指出,尽管编辑 SqsQueue.php
有效,但它很容易被 composer install
或 composer update
重置。另一种方法是为 SQS FIFO 实施新的 Illuminate\Queue\Connectors\ConnectorInterface
,然后将其添加到 Laravel 的队列管理器。
我的做法如下:
- 创建一个扩展
Illuminate\Queue\SqsQueue
但支持 SQS FIFO 的新 SqsFifoQueue
class。
- 创建一个新的
SqsFifoConnector
class 来扩展 Illuminate\Queue\Connectors\SqsConnector
,它将使用 SqsFifoQueue
. 建立连接
- 创建一个新的
SqsFifoServiceProvider
,将 SqsFifoConnector
注册到 Laravel 的队列管理器。
- 将
SqsFifoServiceProvider
添加到您的 config/app.php
。
- 更新
config/queue.php
以使用新的 SQS FIFO 队列驱动程序。
示例:
创建一个扩展 Illuminate\Queue\SqsQueue
但支持 SQS FIFO 的新 SqsFifoQueue
class。
<?php
class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
{
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
'MessageGroupId' => uniqid(),
'MessageDeduplicationId' => uniqid(),
]);
return $response->get('MessageId');
}
}
创建一个扩展 Illuminate\Queue\Connectors\SqsConnector
的新 SqsFifoConnector
class,它将使用 SqsFifoQueue
.
建立连接
<?php
use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;
class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
public function connect(array $config)
{
$config = $this->getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret']);
}
return new SqsFifoQueue(
new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
);
}
}
创建一个新的 SqsFifoServiceProvider
,将 SqsFifoConnector
注册到 Laravel 的队列管理器。
<?php
class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider
{
public function register()
{
$this->app->afterResolving('queue', function ($manager) {
$manager->addConnector('sqsfifo', function () {
return new SqsFifoConnector;
});
});
}
}
将 SqsFifoServiceProvider
添加到您的 config/app.php
。
<?php
return [
'providers' => [
...
SqsFifoServiceProvider::class,
],
];
更新 config/queue.php
以使用新的 SQS FIFO 队列驱动程序。
<?php
return [
'default' => 'sqsfifo',
'connections' => [
'sqsfifo' => [
'driver' => 'sqsfifo',
'key' => 'my_key'
'secret' => 'my_secret',
'queue' => 'my_queue_url',
'region' => 'my_sqs_region',
],
],
];
那么您的队列现在应该支持 SQS FIFO 队列。
无耻插件:在执行上述步骤时,我创建了一个 laravel-sqs-fifo composer package to handle this at https://github.com/maqe/laravel-sqs-fifo.
FIFO 消息的工作方式与标准 AWS SQS 队列不同。
您需要一个单独的驱动程序来处理 FIFO 队列。
我不得不面对同样的情况,下面的包裹是我的救星。
https://packagist.org/packages/shiftonelabs/laravel-sqs-fifo-queue
在queue.php
'sqs-fifo' => [
'driver' => 'sqs-fifo',
'key' => env('SQS_KEY'),
'secret' => env('SQS_SECRET'),
'prefix' => env('SQS_PREFIX'),
'queue' => env('SQS_QUEUE'),
'region' => env('SQS_REGION'),
'group' => 'default',
'deduplicator' => 'unique',
],
然后
dispatch(new TestJob([]))->onQueue('My_Mail_Queue.fifo');
注意:
您需要在 .env
中指定要在应用程序中使用的默认队列名称
SQS_QUEUE=My_Default_queue.fifo
此外,您需要在侦听器中指定要在应用程序中使用的所有队列名称。 (如果整个应用使用相同的队列名称,则不需要在监听器中指定队列名称)
php artisan queue:listen --queue=My_Default_queue.fifo,My_Mail_Queue.fifo,My_Message_Queue.fifo
Amazon 已经宣布了他们的 new FIFO SQS service,我想在 Laravel 队列中使用它来解决一些并发问题。
我创建了几个新队列并更改了配置。但是,我收到一个 MissingParameter
错误,上面写着
The request must contain the parameter MessageGroupId.
所以我修改了文件vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);
return $response->get('MessageId');
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
])->get('MessageId');
}
我正在使用 APP_ENV
作为组 ID(这是一个单一的消息队列,所以实际上它并不重要。我只希望一切都是先进先出)。
但我仍然收到相同的错误消息。我该如何解决?任何帮助将不胜感激。
(顺便说一句,SDK在哪里定义的sendMessage
?我可以找到它的存根,但我没有找到详细的实现)
除了 MessageGroupId
,它还需要 MessageDeduplicationId
或启用基于内容的重复数据删除。
我想向可能遇到同样问题的其他人指出,尽管编辑 SqsQueue.php
有效,但它很容易被 composer install
或 composer update
重置。另一种方法是为 SQS FIFO 实施新的 Illuminate\Queue\Connectors\ConnectorInterface
,然后将其添加到 Laravel 的队列管理器。
我的做法如下:
- 创建一个扩展
Illuminate\Queue\SqsQueue
但支持 SQS FIFO 的新SqsFifoQueue
class。 - 创建一个新的
SqsFifoConnector
class 来扩展Illuminate\Queue\Connectors\SqsConnector
,它将使用SqsFifoQueue
. 建立连接
- 创建一个新的
SqsFifoServiceProvider
,将SqsFifoConnector
注册到 Laravel 的队列管理器。 - 将
SqsFifoServiceProvider
添加到您的config/app.php
。 - 更新
config/queue.php
以使用新的 SQS FIFO 队列驱动程序。
示例:
创建一个扩展
Illuminate\Queue\SqsQueue
但支持 SQS FIFO 的新SqsFifoQueue
class。<?php class SqsFifoQueue extends \Illuminate\Queue\SqsQueue { public function pushRaw($payload, $queue = null, array $options = []) { $response = $this->sqs->sendMessage([ 'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'MessageGroupId' => uniqid(), 'MessageDeduplicationId' => uniqid(), ]); return $response->get('MessageId'); } }
创建一个扩展
建立连接Illuminate\Queue\Connectors\SqsConnector
的新SqsFifoConnector
class,它将使用SqsFifoQueue
.<?php use Aws\Sqs\SqsClient; use Illuminate\Support\Arr; class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector { public function connect(array $config) { $config = $this->getDefaultConfiguration($config); if ($config['key'] && $config['secret']) { $config['credentials'] = Arr::only($config, ['key', 'secret']); } return new SqsFifoQueue( new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '') ); } }
创建一个新的
SqsFifoServiceProvider
,将SqsFifoConnector
注册到 Laravel 的队列管理器。<?php class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider { public function register() { $this->app->afterResolving('queue', function ($manager) { $manager->addConnector('sqsfifo', function () { return new SqsFifoConnector; }); }); } }
将
SqsFifoServiceProvider
添加到您的config/app.php
。<?php return [ 'providers' => [ ... SqsFifoServiceProvider::class, ], ];
更新
config/queue.php
以使用新的 SQS FIFO 队列驱动程序。<?php return [ 'default' => 'sqsfifo', 'connections' => [ 'sqsfifo' => [ 'driver' => 'sqsfifo', 'key' => 'my_key' 'secret' => 'my_secret', 'queue' => 'my_queue_url', 'region' => 'my_sqs_region', ], ], ];
那么您的队列现在应该支持 SQS FIFO 队列。
无耻插件:在执行上述步骤时,我创建了一个 laravel-sqs-fifo composer package to handle this at https://github.com/maqe/laravel-sqs-fifo.
FIFO 消息的工作方式与标准 AWS SQS 队列不同。
您需要一个单独的驱动程序来处理 FIFO 队列。
我不得不面对同样的情况,下面的包裹是我的救星。
https://packagist.org/packages/shiftonelabs/laravel-sqs-fifo-queue
在queue.php
'sqs-fifo' => [
'driver' => 'sqs-fifo',
'key' => env('SQS_KEY'),
'secret' => env('SQS_SECRET'),
'prefix' => env('SQS_PREFIX'),
'queue' => env('SQS_QUEUE'),
'region' => env('SQS_REGION'),
'group' => 'default',
'deduplicator' => 'unique',
],
然后
dispatch(new TestJob([]))->onQueue('My_Mail_Queue.fifo');
注意:
您需要在 .env
SQS_QUEUE=My_Default_queue.fifo
此外,您需要在侦听器中指定要在应用程序中使用的所有队列名称。 (如果整个应用使用相同的队列名称,则不需要在监听器中指定队列名称)
php artisan queue:listen --queue=My_Default_queue.fifo,My_Mail_Queue.fifo,My_Message_Queue.fifo