如何在 Laravel 队列中容纳 Amazon FIFO SQS?

How to accommodate Amazon FIFO SQS in Laravel queue?

Amazon 已经宣布了他们的 new FIFO SQS service,我想在 Laravel 队列中使用它来解决一些并发问题。

我创建了几个新队列并更改了配置。但是,我收到一个 MissingParameter 错误,上面写着

The request must contain the parameter MessageGroupId.

所以我修改了文件vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php

public function pushRaw($payload, $queue = null, array $options = [])
{
    $response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);

    return $response->get('MessageId');
}

public function later($delay, $job, $data = '', $queue = null)
{
    $payload = $this->createPayload($job, $data);

    $delay = $this->getSeconds($delay);

    return $this->sqs->sendMessage([
        'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
    ])->get('MessageId');
}

我正在使用 APP_ENV 作为组 ID(这是一个单一的消息队列,所以实际上它并不重要。我只希望一切都是先进先出)。

但我仍然收到相同的错误消息。我该如何解决?任何帮助将不胜感激。

(顺便说一句,SDK在哪里定义的sendMessage?我可以找到它的存根,但我没有找到详细的实现)

除了 MessageGroupId,它还需要 MessageDeduplicationId 或启用基于内容的重复数据删除。

我想向可能遇到同样问题的其他人指出,尽管编辑 SqsQueue.php 有效,但它很容易被 composer installcomposer update 重置。另一种方法是为 SQS FIFO 实施新的 Illuminate\Queue\Connectors\ConnectorInterface,然后将其添加到 Laravel 的队列管理器。

我的做法如下:

  1. 创建一个扩展 Illuminate\Queue\SqsQueue 但支持 SQS FIFO 的新 SqsFifoQueue class。
  2. 创建一个新的 SqsFifoConnector class 来扩展 Illuminate\Queue\Connectors\SqsConnector,它将使用 SqsFifoQueue.
  3. 建立连接
  4. 创建一个新的 SqsFifoServiceProvider,将 SqsFifoConnector 注册到 Laravel 的队列管理器。
  5. SqsFifoServiceProvider 添加到您的 config/app.php
  6. 更新 config/queue.php 以使用新的 SQS FIFO 队列驱动程序。

示例:

  1. 创建一个扩展 Illuminate\Queue\SqsQueue 但支持 SQS FIFO 的新 SqsFifoQueue class。

    <?php
    
    class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
    {
        public function pushRaw($payload, $queue = null, array $options = [])
        {
            $response = $this->sqs->sendMessage([
                'QueueUrl' => $this->getQueue($queue),
                'MessageBody' => $payload,
                'MessageGroupId' => uniqid(),
                'MessageDeduplicationId' => uniqid(),
            ]);
    
            return $response->get('MessageId');
        }
    }
    
  2. 创建一个扩展 Illuminate\Queue\Connectors\SqsConnector 的新 SqsFifoConnector class,它将使用 SqsFifoQueue.

    建立连接
    <?php
    
    use Aws\Sqs\SqsClient;
    use Illuminate\Support\Arr;
    
    class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
    {
        public function connect(array $config)
        {
            $config = $this->getDefaultConfiguration($config);
    
            if ($config['key'] && $config['secret']) {
                $config['credentials'] = Arr::only($config, ['key', 'secret']);
            }
    
            return new SqsFifoQueue(
                new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
            );
        }
    }
    
  3. 创建一个新的 SqsFifoServiceProvider,将 SqsFifoConnector 注册到 Laravel 的队列管理器。

    <?php
    
    class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider
    {
        public function register()
        {
            $this->app->afterResolving('queue', function ($manager) {
                $manager->addConnector('sqsfifo', function () {
                    return new SqsFifoConnector;
                });
            });
        }
    }
    
  4. SqsFifoServiceProvider 添加到您的 config/app.php

    <?php
    
    return [
        'providers'     => [
            ...
            SqsFifoServiceProvider::class,
        ],
    ];
    
  5. 更新 config/queue.php 以使用新的 SQS FIFO 队列驱动程序。

    <?php
    
    return [
    
        'default' => 'sqsfifo',
    
        'connections' => [
            'sqsfifo' => [
                'driver' => 'sqsfifo',
                'key'    => 'my_key'
                'secret' => 'my_secret',
                'queue'  => 'my_queue_url',
                'region' => 'my_sqs_region',
            ],
        ],
    ];
    

那么您的队列现在应该支持 SQS FIFO 队列。

无耻插件:在执行上述步骤时,我创建了一个 laravel-sqs-fifo composer package to handle this at https://github.com/maqe/laravel-sqs-fifo.

FIFO 消息的工作方式与标准 AWS SQS 队列不同。

您需要一个单独的驱动程序来处理 FIFO 队列。

我不得不面对同样的情况,下面的包裹是我的救星。

https://packagist.org/packages/shiftonelabs/laravel-sqs-fifo-queue

queue.php

'sqs-fifo' => [
            'driver' => 'sqs-fifo',
            'key' => env('SQS_KEY'),
            'secret' => env('SQS_SECRET'),
            'prefix' => env('SQS_PREFIX'),
            'queue' => env('SQS_QUEUE'),
            'region' => env('SQS_REGION'),
            'group' => 'default',
            'deduplicator' => 'unique',
        ],

然后

dispatch(new TestJob([]))->onQueue('My_Mail_Queue.fifo');

注意: 您需要在 .env

中指定要在应用程序中使用的默认队列名称
SQS_QUEUE=My_Default_queue.fifo

此外,您需要在侦听器中指定要在应用程序中使用的所有队列名称。 (如果整个应用使用相同的队列名称,则不需要在监听器中指定队列名称)

php artisan queue:listen --queue=My_Default_queue.fifo,My_Mail_Queue.fifo,My_Message_Queue.fifo